diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java index caacc22debee..63bcb70d4391 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java @@ -102,6 +102,10 @@ public boolean isFinished() { return isSuccess() || isFailure() || isStop() || isPause(); } + public boolean canFailover() { + return !isFinished() && !(this == SERIAL_WAIT || this == WAIT_TO_RUN); + } + /** * status is success * diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index f85200b7a2c6..79eec5e61ed1 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; @RpcService public interface ILogicTaskInstanceOperator { @@ -38,4 +40,7 @@ public interface ILogicTaskInstanceOperator { @RpcMethod LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); + @RpcMethod + LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java new file mode 100644 index 000000000000..3381c48f7f4d --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class LogicTaskTakeOverRequest implements Serializable { + + private static final long serialVersionUID = -1L; + + private TaskExecutionContext taskExecutionContext; +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java new file mode 100644 index 000000000000..e800fb09e5b3 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class LogicTaskTakeOverResponse { + + private Integer taskInstanceId; + + private boolean success; + + private String message; + + public static LogicTaskTakeOverResponse success(Integer taskInstanceId) { + return new LogicTaskTakeOverResponse(taskInstanceId, true, "take over success"); + } + + public static LogicTaskTakeOverResponse failed(Integer taskInstanceId, String message) { + return new LogicTaskTakeOverResponse(taskInstanceId, false, message); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index 774cec05a2d9..3bc3b588c5c0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -25,6 +25,7 @@ import org.apache.commons.collections4.CollectionUtils; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -68,7 +69,7 @@ public void startScheduleThread() { } public void subscribeWorkerGroupsChange(WorkerGroupListener listener) { - + // add all group when listener added listener.onWorkerGroupAdd(new ArrayList<>(workerGroupMap.values())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index 506fcdffb72a..a5999e293659 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -93,6 +93,8 @@ protected void assembleWorkflowInstance( throw new IllegalArgumentException( "The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid"); } + workflowInstance.setCommandType(command.getCommandType()); + workflowInstance.addHistoryCmd(command.getCommandType()); workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus()); workflowInstanceDao.updateById(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java index 0c2bf92d108f..79baeaac2c0c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.engine.graph; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; @@ -252,7 +253,8 @@ public boolean isTaskExecutionRunnableSkipped(final ITaskExecutionRunnable taskE @Override public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable) { - return false; + return taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO; +// return false; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index 1c364a7e224f..a05f58f1cd33 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -25,11 +25,18 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; +import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper; import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; @@ -157,29 +164,69 @@ private void initializeTaskExecutionContext() { .createTaskExecutionContext(request); } + private WorkflowInstance getValidSubWorkflowInstance() { + + WorkflowInstanceRelation workflowInstanceRelation = + applicationContext.getBean(WorkflowInstanceRelationMapper.class) + .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); + if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) { + return null; + } + + WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class) + .queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); + + if (workflowInstance == null || !workflowInstance.getState().canFailover()) { + return null; + } + + return workflowInstance; + } + private boolean takeOverTaskFromExecutor() { checkState(isTaskInstanceInitialized(), "The task instance is null, can't take over from executor."); - if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) { - return false; - } if (StringUtils.isEmpty(taskInstance.getHost())) { log.debug("Task: {} host is empty, cannot take over the task from executor(This is normal case).", taskInstance.getName()); return false; } - try { - final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() - .taskInstanceId(taskInstance.getId()) - .workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress()) - .build(); - final TakeOverTaskResponse takeOverTaskResponse = Clients - .withService(ITaskInstanceOperator.class) - .withHost(taskInstance.getHost()) - .takeOverTask(takeOverTaskRequest); - return takeOverTaskResponse.isSuccess(); - } catch (Exception ex) { - log.warn("Take over task: {} failed", taskInstance.getName(), ex); - return false; + + if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) { + if (!taskInstance.getTaskType().equals(SubWorkflowLogicTaskChannelFactory.NAME)) { + return false; + } + final WorkflowInstance subWorkflowInstance = getValidSubWorkflowInstance(); + if (subWorkflowInstance == null) { + return false; + } + try { + final LogicTaskTakeOverRequest takeOverTaskRequest = + new LogicTaskTakeOverRequest(taskExecutionContext); + + final LogicTaskTakeOverResponse takeOverTaskResponse = Clients + .withService(ILogicTaskInstanceOperator.class) + .withHost(taskInstance.getHost()) + .takeOverLogicTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.warn("Take over logic task: {} failed", taskInstance.getName(), ex); + return false; + } + } else { + try { + final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() + .taskInstanceId(taskInstance.getId()) + .workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress()) + .build(); + final TakeOverTaskResponse takeOverTaskResponse = Clients + .withService(ITaskInstanceOperator.class) + .withHost(taskInstance.getHost()) + .takeOverTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.warn("Take over task: {} failed", taskInstance.getName(), ex); + return false; + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java new file mode 100644 index 000000000000..35432d54ab24 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; +import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class LogicITaskInstanceTakeOverOperationFunction + implements + ITaskInstanceOperationFunction { + + @Autowired + private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; + + @Autowired + private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool; + + @Autowired + private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; + + @Override + public LogicTaskTakeOverResponse operate(LogicTaskTakeOverRequest taskTakeoverRequest) { + log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest); + TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext(); + try { + final int taskInstanceId = taskExecutionContext.getTaskInstanceId(); + final int workflowInstanceId = taskExecutionContext.getWorkflowInstanceId(); + final String taskInstanceName = taskExecutionContext.getTaskName(); + + taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); + + MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); + + final MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder + .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) + .createMasterTaskExecutor(taskExecutionContext); + + if (masterTaskExecutorThreadPool.takeOverMasterTaskExecutor(masterTaskExecutor)) { + log.info("Take over LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); + return LogicTaskTakeOverResponse.success(taskInstanceId); + } else { + log.error("Take over LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); + return LogicTaskTakeOverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); + } + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceLogFullPathMDC(); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index de4095831e9b..fe5e8a1704dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -32,6 +32,9 @@ public class LogicTaskInstanceOperationFunctionManager { @Autowired private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; + @Autowired + private LogicITaskInstanceTakeOverOperationFunction logicITaskInstanceTakeOverOperationFunction; + public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; } @@ -44,4 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati return logicITaskInstancePauseOperationFunction; } + public LogicITaskInstanceTakeOverOperationFunction getLogicTaskInstanceTakeOverOperationFunction() { + return logicITaskInstanceTakeOverOperationFunction; + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index 39446bd89d26..8ecf353040f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import lombok.extern.slf4j.Slf4j; @@ -55,4 +57,10 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ .operate(taskPauseRequest); } + @Override + public LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest) { + return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeOverOperationFunction() + .operate(taskTakeOverRequest); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java index 9c30d49f8fb2..5c7f04d319a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import lombok.extern.slf4j.Slf4j; @@ -51,6 +52,23 @@ public boolean submitMasterTaskExecutor(final MasterTaskExecutor masterTaskExecu throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); } + public boolean takeOverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + if (!(masterTaskExecutor.getLogicTask() instanceof SubWorkflowLogicTask)) { + throw new IllegalArgumentException( + "Only SubWorkflowLogicTask can be take over: " + masterTaskExecutor.getLogicTask()); + } + MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); + if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { + return masterSyncTaskExecutorThreadPool + .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); + } + if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { + return masterAsyncTaskExecutorThreadPool + .submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); + } + throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); + } + public boolean removeMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { return masterSyncTaskExecutorThreadPool diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java index 69109f09dcdc..1452b6507344 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java @@ -30,11 +30,11 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverFailureTaskTrigger; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverSuspendTaskTrigger; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowManualTrigger; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTrigger; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTriggerRequest; import lombok.extern.slf4j.Slf4j; @@ -49,7 +49,7 @@ public class SubWorkflowControlClient { private WorkflowInstanceDao workflowInstanceDao; @Autowired - private SubWorkflowManualTrigger subWorkflowManualTrigger; + private SubWorkflowTrigger subWorkflowTrigger; @Autowired private WorkflowInstanceRecoverFailureTaskTrigger workflowInstanceRecoverFailureTaskTrigger; @@ -57,8 +57,8 @@ public class SubWorkflowControlClient { @Autowired private WorkflowInstanceRecoverSuspendTaskTrigger workflowInstanceRecoverSuspendTaskTrigger; - public Integer triggerSubWorkflow(final WorkflowManualTriggerRequest workflowManualTriggerRequest) { - return subWorkflowManualTrigger.triggerWorkflow(workflowManualTriggerRequest).getWorkflowInstanceId(); + public Integer triggerSubWorkflow(final SubWorkflowTriggerRequest subWorkflowTriggerRequest) { + return subWorkflowTrigger.triggerWorkflow(subWorkflowTriggerRequest).getWorkflowInstanceId(); } public WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks( diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index d5f21bfc5398..6e923ccbcc2c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task.subworkflow; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; @@ -32,7 +33,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTriggerRequest; import lombok.extern.slf4j.Slf4j; @@ -71,14 +72,21 @@ public SubWorkflowLogicTask(final TaskExecutionContext taskExecutionContext, @Override public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() { - subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); - upsertSubWorkflowRelation(); - taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); - - applicationContext - .getBean(LogicTaskInstanceExecutionEventSenderManager.class) - .runningEventSender() - .sendMessage(taskExecutionContext); + // if the sub-workflow instance is already exists and in fail over process, + // there should take over the task without create a new sub-workflow instance + if (subWorkflowLogicTaskRuntimeContext == null + || workflowExecutionRunnable.getWorkflowInstance() + .getCommandType() != CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) { + + subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); + upsertSubWorkflowRelation(); + taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); + + applicationContext + .getBean(LogicTaskInstanceExecutionEventSenderManager.class) + .runningEventSender() + .sendMessage(taskExecutionContext); + } return new SubWorkflowAsyncTaskExecuteFunction( subWorkflowLogicTaskRuntimeContext, @@ -183,7 +191,8 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() { final ICommandParam commandParam = JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class); - final WorkflowManualTriggerRequest workflowManualTriggerRequest = WorkflowManualTriggerRequest.builder() + final SubWorkflowTriggerRequest workflowManualTriggerRequest = SubWorkflowTriggerRequest.builder() + .scheduleTIme(workflowInstance.getScheduleTime()) .userId(taskExecutionContext.getExecutorId()) .workflowDefinitionCode(subWorkflowDefinition.getCode()) .workflowDefinitionVersion(subWorkflowDefinition.getVersion()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java deleted file mode 100644 index 5484de2128dc..000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; - -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; -import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger; - -import org.springframework.stereotype.Component; - -/** - * Manual trigger of the workflow, used to trigger the workflow and generate the workflow instance in the manual way. - */ -@Component -public class SubWorkflowManualTrigger extends WorkflowManualTrigger { - - @Override - protected WorkflowInstance constructWorkflowInstance(final WorkflowManualTriggerRequest workflowManualTriggerRequest) { - final WorkflowInstance workflowInstance = super.constructWorkflowInstance(workflowManualTriggerRequest); - workflowInstance.setIsSubWorkflow(Flag.YES); - return workflowInstance; - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java new file mode 100644 index 000000000000..00b592ca7a9d --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; +import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; +import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.AbstractWorkflowTrigger; + +import org.apache.commons.lang3.ObjectUtils; + +import java.util.Date; + +import org.springframework.stereotype.Component; + +/** + * Used to trigger the sub-workflow and generate the workflow instance. + */ +@Component +public class SubWorkflowTrigger + extends + AbstractWorkflowTrigger { + + @Override + protected WorkflowInstance constructWorkflowInstance(final SubWorkflowTriggerRequest subWorkflowTriggerRequest) { + final CommandType commandType = CommandType.START_PROCESS; + final Long workflowCode = subWorkflowTriggerRequest.getWorkflowDefinitionCode(); + final Integer workflowVersion = subWorkflowTriggerRequest.getWorkflowDefinitionVersion(); + final WorkflowDefinition workflowDefinition = getProcessDefinition(workflowCode, workflowVersion); + + final WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode()); + workflowInstance.setWorkflowDefinitionVersion(workflowDefinition.getVersion()); + workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); + workflowInstance.setCommandType(commandType); + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name()); + workflowInstance.setRecovery(Flag.NO); + workflowInstance.setScheduleTime(subWorkflowTriggerRequest.getScheduleTIme()); + workflowInstance.setStartTime(new Date()); + workflowInstance.setRestartTime(workflowInstance.getStartTime()); + workflowInstance.setRunTimes(1); + workflowInstance.setName(String.join("-", workflowDefinition.getName(), DateUtils.getCurrentTimeStamp())); + workflowInstance.setTaskDependType(subWorkflowTriggerRequest.getTaskDependType()); + workflowInstance.setFailureStrategy(subWorkflowTriggerRequest.getFailureStrategy()); + workflowInstance.setWarningType( + ObjectUtils.defaultIfNull(subWorkflowTriggerRequest.getWarningType(), WarningType.NONE)); + workflowInstance.setWarningGroupId(subWorkflowTriggerRequest.getWarningGroupId()); + workflowInstance.setExecutorId(subWorkflowTriggerRequest.getUserId()); + workflowInstance.setExecutorName(getExecutorUser(subWorkflowTriggerRequest.getUserId()).getUserName()); + workflowInstance.setTenantCode(subWorkflowTriggerRequest.getTenantCode()); + workflowInstance.setIsSubWorkflow(Flag.YES); + workflowInstance.addHistoryCmd(commandType); + workflowInstance.setWorkflowInstancePriority(subWorkflowTriggerRequest.getWorkflowInstancePriority()); + workflowInstance.setWorkerGroup( + WorkerGroupUtils.getWorkerGroupOrDefault(subWorkflowTriggerRequest.getWorkerGroup())); + workflowInstance.setEnvironmentCode( + EnvironmentUtils.getEnvironmentCodeOrDefault(subWorkflowTriggerRequest.getEnvironmentCode())); + workflowInstance.setTimeout(workflowDefinition.getTimeout()); + workflowInstance.setDryRun(subWorkflowTriggerRequest.getDryRun().getCode()); + workflowInstance.setTestFlag(subWorkflowTriggerRequest.getTestFlag().getCode()); + return workflowInstance; + } + + @Override + protected Command constructTriggerCommand(final SubWorkflowTriggerRequest subWorkflowTriggerRequest, + final WorkflowInstance workflowInstance) { + final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder() + .commandParams(subWorkflowTriggerRequest.getStartParamList()) + .startNodes(subWorkflowTriggerRequest.getStartNodes()) + .timeZone(DateUtils.getTimezone()) + .build(); + return Command.builder() + .commandType(CommandType.START_PROCESS) + .workflowDefinitionCode(subWorkflowTriggerRequest.getWorkflowDefinitionCode()) + .workflowDefinitionVersion(subWorkflowTriggerRequest.getWorkflowDefinitionVersion()) + .workflowInstanceId(workflowInstance.getId()) + .workflowInstancePriority(workflowInstance.getWorkflowInstancePriority()) + .commandParam(JSONUtils.toJsonString(runWorkflowCommandParam)) + .build(); + } + + @Override + protected SubWorkflowTriggerResponse onTriggerSuccess(WorkflowInstance workflowInstance) { + return SubWorkflowTriggerResponse.success(workflowInstance.getId()); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java new file mode 100644 index 000000000000..056a643e4ed2 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubWorkflowTriggerRequest { + + private Date scheduleTIme; + + private Integer userId; + + private Long workflowDefinitionCode; + + private Integer workflowDefinitionVersion; + + private List startNodes; + + @Builder.Default + private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; + + @Builder.Default + private TaskDependType taskDependType = TaskDependType.TASK_POST; + + @Builder.Default + private WarningType warningType = WarningType.NONE; + + private Integer warningGroupId; + + @Builder.Default + private Priority workflowInstancePriority = Priority.MEDIUM; + + private String workerGroup; + + private String tenantCode; + + private Long environmentCode; + + @Builder.Default + private List startParamList = new ArrayList<>(); + + @Builder.Default + private Flag dryRun = Flag.NO; + + @Builder.Default + private Flag testFlag = Flag.NO; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java new file mode 100644 index 000000000000..1935ddd75eb8 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubWorkflowTriggerResponse { + + private boolean success; + + private String message; + + private Integer workflowInstanceId; + + public static SubWorkflowTriggerResponse fail(String message) { + return SubWorkflowTriggerResponse.builder() + .success(false) + .message(message) + .build(); + } + + public static SubWorkflowTriggerResponse success(Integer workflowInstanceId) { + return SubWorkflowTriggerResponse.builder() + .success(true) + .workflowInstanceId(workflowInstanceId) + .build(); + } + +}