From 38ff4932d2f4277f9a11ad8e6c6a35faf4cb070a Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 28 Jan 2024 13:32:24 +0800 Subject: [PATCH] Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time --- .../api/controller/TaskGroupController.java | 26 +- .../api/service/TaskGroupService.java | 8 - .../api/service/impl/ExecutorServiceImpl.java | 49 +-- .../impl/TaskGroupQueueServiceImpl.java | 18 +- .../service/impl/TaskGroupServiceImpl.java | 11 - .../dao/mapper/TaskGroupMapper.java | 29 +- .../dao/mapper/TaskGroupQueueMapper.java | 17 + .../dao/repository/TaskGroupDao.java | 32 +- .../dao/repository/TaskGroupQueueDao.java | 30 +- .../dao/repository/impl/TaskGroupDaoImpl.java | 64 ++++ .../impl/TaskGroupQueueDaoImpl.java | 76 ++++ .../dao/mapper/TaskGroupMapper.xml | 58 ++- .../dao/mapper/TaskGroupQueueMapper.xml | 61 ++- .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgresql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 4 +- .../postgresql/dolphinscheduler_ddl.sql | 4 +- .../mysql/dolphinscheduler_ddl.sql | 4 +- .../postgresql/dolphinscheduler_ddl.sql | 4 +- .../master/ILogicTaskInstanceOperator.java | 9 - .../master/IWorkflowInstanceService.java | 5 + .../TaskInstanceWakeupRequest.java | 19 +- .../server/master/MasterServer.java | 5 + .../event/StateEventHandlerManager.java | 10 +- .../master/event/TaskStateEventHandler.java | 11 +- .../event/TaskWaitTaskGroupStateHandler.java | 47 --- ...cTaskInstanceOperationFunctionManager.java | 13 - .../rpc/LogicTaskInstanceOperatorImpl.java | 14 - ...skInstanceForceStartOperationFunction.java | 58 --- .../TaskInstanceWakeupOperationFunction.java | 36 +- .../rpc/WorkflowInstanceServiceImpl.java | 10 + .../runner/WorkflowExecuteRunnable.java | 147 ++----- .../WorkflowExecuteRunnableFactory.java | 9 +- .../taskgroup/TaskGroupCoordinator.java | 358 ++++++++++++++++++ .../runner/WorkflowExecuteRunnableTest.java | 11 +- .../registry/api/enums/RegistryNodeType.java | 1 + .../service/process/ProcessService.java | 33 -- .../service/process/ProcessServiceImpl.java | 315 --------------- .../service/process/ProcessServiceTest.java | 16 - 40 files changed, 807 insertions(+), 819 deletions(-) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java => dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java (57%) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java => dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java (52%) create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java create mode 100644 dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java index cb0d15909ec7d..5d8735fbaa2cc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java @@ -298,7 +298,7 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value = * @param pageSize page size * @return queue list */ - @Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES") + @Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES") @Parameters({ @Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")), @Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")), @@ -310,15 +310,21 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value = @GetMapping(value = "/query-list-by-group-id") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR) - public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId, - @RequestParam(value = "taskInstanceName", required = false) String taskName, - @RequestParam(value = "processInstanceName", required = false) String processName, - @RequestParam(value = "status", required = false) Integer status, - @RequestParam("pageNo") Integer pageNo, - @RequestParam("pageSize") Integer pageSize) { - Map result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status, - groupId, pageNo, pageSize); + public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId, + @RequestParam(value = "taskInstanceName", required = false) String taskName, + @RequestParam(value = "processInstanceName", required = false) String processName, + @RequestParam(value = "status", required = false) Integer status, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize) { + Map result = taskGroupQueueService.queryTasksByGroupId( + loginUser, + taskName, + processName, + status, + groupId, + pageNo, + pageSize); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java index db909da15816d..885e23cf70f52 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java @@ -50,14 +50,6 @@ Map createTaskGroup(User loginUser, Long projectCode, String nam Map updateTaskGroup(User loginUser, int id, String name, String description, int groupSize); - /** - * get task group status - * - * @param id task group id - * @return the result code and msg - */ - boolean isTheTaskGroupAvailable(int id); - /** * query all task group by user id * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 25eca41158d69..4cbef4c4e89bf 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -55,7 +55,6 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.model.Server; @@ -83,14 +82,12 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator; import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener; import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest; import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.service.command.CommandService; @@ -552,16 +549,20 @@ public Map forceStartTaskInstance(User loginUser, int queueId) { Map result = new HashMap<>(); TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId); // check process instance exist - ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId()); - if (processInstance == null) { - log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", - taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId()); - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()); - return result; + processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId()) + .orElseThrow( + () -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId())); + checkMasterExists(); + + if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) { + throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START); } + taskGroupQueue.setForceStart(Flag.YES.getCode()); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueMapper.updateById(taskGroupQueue); - checkMasterExists(); - return forceStart(processInstance, taskGroupQueue); + result.put(Constants.STATUS, Status.SUCCESS); + return result; } public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) { @@ -664,32 +665,6 @@ private Map updateProcessInstancePrepare(ProcessInstance process return result; } - /** - * prepare to update process instance command type and status - * - * @param processInstance process instance - * @return update result - */ - private Map forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) { - Map result = new HashMap<>(); - if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) { - log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId()); - putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START); - return result; - } - - taskGroupQueue.setForceStart(Flag.YES.getCode()); - taskGroupQueue.setUpdateTime(new Date()); - processService.updateTaskGroupQueue(taskGroupQueue); - log.info("Sending force start command to master: {}.", processInstance.getHost()); - ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class); - iLogicTaskInstanceOperator.forceStartTaskInstance( - new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId())); - putMsg(result, Status.SUCCESS); - return result; - } - /** * check whether sub processes are offline before starting process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java index d33115479f594..02c1bea4dca20 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr * @return tasks list */ @Override - public Map queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status, - int groupId, int pageNo, int pageSize) { + public Map queryTasksByGroupId(User loginUser, + String taskName, + String processName, + Integer status, + int groupId, + int pageNo, + int pageSize) { Map result = new HashMap<>(); Page page = new Page<>(pageNo, pageSize); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); @@ -79,8 +84,13 @@ public Map queryTasksByGroupId(User loginUser, String taskName, return result; } List projects = projectMapper.selectBatchIds(projectIds); - IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page, - taskName, processName, status, groupId, projects); + IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging( + page, + taskName, + processName, + status, + groupId, + projects); pageInfo.setTotal((int) taskGroupQueue.getTotal()); pageInfo.setTotalList(taskGroupQueue.getRecords()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 3122d090e2d49..904f007f4898b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -202,17 +202,6 @@ public Map updateTaskGroup(User loginUser, int id, String name, return result; } - /** - * get task group status - * - * @param id task group id - * @return is the task group available - */ - @Override - public boolean isTheTaskGroupAvailable(int id) { - return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1; - } - /** * query all task group by user id * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index 794bdbab3a128..8fb3923307728 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -35,20 +35,6 @@ */ public interface TaskGroupMapper extends BaseMapper { - int robTaskGroupResource(@Param("id") int id, - @Param("currentUseSize") int currentUseSize, - @Param("queueId") int queueId, - @Param("queueStatus") int queueStatus); - - /** - * update table of task group - * - * @param id primary key - * @return affected rows - */ - int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize, - @Param("queueId") int queueId, @Param("queueStatus") int queueStatus); - /** * select task groups paging * @@ -69,13 +55,6 @@ IPage queryTaskGroupPaging(IPage page, @Param("name") Stri */ TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name); - /** - * Select the groupSize > useSize Count - */ - int selectAvailableCountById(@Param("groupId") int groupId); - - int selectCountByIdStatus(@Param("id") int id, @Param("status") int status); - IPage queryTaskGroupPagingByProjectCode(Page page, @Param("projectCode") Long projectCode); /** @@ -87,4 +66,12 @@ IPage queryTaskGroupPaging(IPage page, @Param("name") Stri List listAuthorizedResource(@Param("userId") int userId); List selectByProjectCode(@Param("projectCode") long projectCode); + + List queryAvailableTaskGroups(); + + List queryUsedTaskGroups(); + + int acquireTaskGroupSlot(@Param("id") Integer id); + + int releaseTaskGroupSlot(@Param("id") Integer id); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index 437a22efb7406..9af1c8900ad58 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -104,5 +104,22 @@ IPage queryTaskGroupQueueByTaskGroupIdPaging(Page workflowInstanceIds); + void deleteByTaskGroupIds(@Param("taskGroupIds") List taskGroupIds); + + void updateTaskGroupPriorityByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId, + @Param("priority") int taskGroupPriority); + + List queryAllInQueueTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId, + @Param("inQueue") int inQueue); + + List queryAllInQueueTaskGroupQueue(); + + List queryByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId); + + List queryUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId, + @Param("status") int status, + @Param("inQueue") int inQueue, + @Param("forceStart") int forceStart); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java similarity index 57% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java rename to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java index db19c023dc862..ab75e9cb80602 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartResponse.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupDao.java @@ -15,27 +15,27 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.master.transportor; +package org.apache.dolphinscheduler.dao.repository; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; -@Data -@NoArgsConstructor -@AllArgsConstructor -public class TaskInstanceForceStartResponse { +import java.util.List; - private boolean success; +public interface TaskGroupDao extends IDao { - private String message; + List queryAllTaskGroups(); - public static TaskInstanceForceStartResponse success() { - return new TaskInstanceForceStartResponse(true, "dispatch success"); - } + /** + * Query the TaskGroups which useSize > 0 + */ + List queryUsedTaskGroups(); - public static TaskInstanceForceStartResponse failed(String message) { - return new TaskInstanceForceStartResponse(false, message); - } + /** + * Query the TaskGroups which useSize < groupSize + */ + List queryAvailableTaskGroups(); + boolean acquireTaskGroupSlot(Integer taskGroupId); + + boolean releaseTaskGroupSlot(Integer taskGroupId); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java similarity index 52% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java rename to dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java index 0a4711fa1e808..38230de791809 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceForceStartRequest.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskGroupQueueDao.java @@ -15,28 +15,26 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.extract.master.transportor; +package org.apache.dolphinscheduler.dao.repository; -import lombok.Data; -import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; -@Data -@NoArgsConstructor -public class TaskInstanceForceStartRequest { +import java.util.List; - private String key; +public interface TaskGroupQueueDao extends IDao { - private int processInstanceId; + void deleteByWorkflowInstanceIds(List workflowInstanceIds); - private int taskInstanceId; + List queryAllInQueueTaskGroupQueue(); - public TaskInstanceForceStartRequest( - int processInstanceId, - int taskInstanceId) { - this.key = String.format("%d-%d", processInstanceId, taskInstanceId); + List queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId); - this.processInstanceId = processInstanceId; - this.taskInstanceId = taskInstanceId; - } + void updateTaskGroupQueueById(TaskGroupQueue taskGroupQueue); + List queryByTaskInstanceId(Integer taskInstanceId); + + /** + * Return the {@link TaskGroupQueue} which is inQueue and forceStart is no. + */ + List queryUsingTaskGroupQueueByGroupId(Integer taskGroupId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java new file mode 100644 index 0000000000000..51e95156ebacd --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupDaoImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; + +import java.util.List; + +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Repository; + +@Slf4j +@Repository +public class TaskGroupDaoImpl extends BaseDao implements TaskGroupDao { + + public TaskGroupDaoImpl(@NonNull TaskGroupMapper taskGroupMapper) { + super(taskGroupMapper); + } + + @Override + public List queryAllTaskGroups() { + return mybatisMapper.selectList(null); + } + + @Override + public List queryUsedTaskGroups() { + return mybatisMapper.queryUsedTaskGroups(); + } + + @Override + public List queryAvailableTaskGroups() { + return mybatisMapper.queryAvailableTaskGroups(); + } + + @Override + public boolean acquireTaskGroupSlot(Integer taskGroupId) { + return mybatisMapper.acquireTaskGroupSlot(taskGroupId) > 0; + } + + @Override + public boolean releaseTaskGroupSlot(Integer taskGroupId) { + return mybatisMapper.releaseTaskGroupSlot(taskGroupId) > 0; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java new file mode 100644 index 0000000000000..406c1a4cf040b --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskGroupQueueDaoImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.repository.impl; + +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; +import org.apache.dolphinscheduler.dao.repository.BaseDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; + +import lombok.NonNull; + +import org.springframework.stereotype.Repository; + +@Repository +public class TaskGroupQueueDaoImpl extends BaseDao implements TaskGroupQueueDao { + + public TaskGroupQueueDaoImpl(@NonNull TaskGroupQueueMapper taskGroupQueueMapper) { + super(taskGroupQueueMapper); + } + + @Override + public void deleteByWorkflowInstanceIds(List workflowInstanceIds) { + if (CollectionUtils.isEmpty(workflowInstanceIds)) { + return; + } + mybatisMapper.deleteByWorkflowInstanceIds(workflowInstanceIds); + } + + @Override + public List queryAllInQueueTaskGroupQueue() { + return mybatisMapper.queryAllInQueueTaskGroupQueue(); + } + + @Override + public List queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId) { + return mybatisMapper.queryAllInQueueTaskGroupQueueByGroupId(taskGroupId, Flag.YES.getCode()); + } + + @Override + public void updateTaskGroupQueueById(TaskGroupQueue taskGroupQueue) { + mybatisMapper.updateById(taskGroupQueue); + } + + @Override + public List queryByTaskInstanceId(Integer taskInstanceId) { + return mybatisMapper.queryByTaskInstanceId(taskInstanceId); + } + + @Override + public List queryUsingTaskGroupQueueByGroupId(Integer taskGroupId) { + return mybatisMapper.queryUsingTaskGroupQueueByGroupId(taskGroupId, + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode(), + Flag.YES.ordinal(), Flag.NO.getCode()); + } +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml index c3f1df0b0ed09..eb14e03dd4015 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml @@ -67,24 +67,6 @@ where project_code = #{projectCode} - - - update t_ds_task_group - set use_size = use_size + 1 - where id = #{id} - and use_size < group_size - and use_size = #{currentUseSize} - and (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus}) = 1 - - - - - update t_ds_task_group - set use_size = use_size-1 - where id = #{id} and use_size > 0 and - (select count(1) FROM t_ds_task_group_queue where id = #{queueId} and status = #{queueStatus} ) = 1 - - - select - count(1) + from t_ds_task_group - where - id = #{groupId} and use_size < group_size + where 1=1 + + and user_id = #{userId} + - select - count(1) + from t_ds_task_group - where - id = #{id} and status = #{status} + where use_size group_size - select - + from t_ds_task_group - where 1=1 - - and user_id = #{userId} - + where use_size ]]> 0 + + update t_ds_task_group + set use_size = use_size + 1 + where id = #{id} + and use_size < group_size + + + + update t_ds_task_group + set use_size = use_size - 1 + where id = #{id} + and use_size > 0 + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml index a5bd80fbad12b..31fb4ecd2640e 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml @@ -46,7 +46,7 @@ and group_id = #{groupId} - order by update_time desc + order by update_time desc, id desc select - queue.id, queue.task_name, queue.group_id, queue.process_id, queue.priority, queue.status - , queue.force_start, queue.create_time, queue.update_time, - process.name as processInstanceName,p.name as projectName,p.code as projectCode + queue.id, + queue.task_name, + queue.group_id, + queue.process_id, + queue.priority, + queue.in_queue, + queue.status, + queue.force_start, + queue.create_time, + queue.update_time, + process.name as processInstanceName, + p.name as projectName, + p.code as projectCode from t_ds_task_group_queue queue left join t_ds_process_instance process on queue.process_id = process.id left join t_ds_process_definition p_f on process.process_definition_code = p_f.code @@ -171,6 +181,15 @@ where process_id = #{workflowInstanceId} + + delete + from t_ds_task_group_queue + where process_id in + + #{i} + + + delete from t_ds_task_group_queue @@ -180,4 +199,38 @@ + + update t_ds_task_group_queue + set priority = #{priority} + where task_id = #{taskInstanceId} + + + + + + + + + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index c22f12c09a183..656e34af0e2ed 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1976,6 +1976,7 @@ CREATE TABLE t_ds_task_group_queue in_queue int(4) DEFAULT '0' , create_time datetime DEFAULT NULL , update_time datetime DEFAULT NULL , + KEY idx_t_ds_task_group_queue_in_queue (in_queue) , PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 347acb0a6496b..0d3e8b190a2b1 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1964,6 +1964,7 @@ CREATE TABLE `t_ds_task_group_queue` ( `in_queue` tinyint(4) DEFAULT '0' COMMENT 'ready to get the queue by other task finish 0 NO ,1 YES', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + KEY `idx_t_ds_task_group_queue_in_queue` (`in_queue`), PRIMARY KEY( `id` ) )ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8 COLLATE = utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 66bcb17f38dcb..b54f19a2d12cd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1946,6 +1946,8 @@ CREATE TABLE t_ds_task_group_queue ( PRIMARY KEY (id) ); +create index idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue(in_queue); + -- -- Table structure for table t_ds_task_group -- diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql index 957f072a43e06..372d6d0de2eb8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/mysql/dolphinscheduler_ddl.sql @@ -109,4 +109,6 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version"; ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; \ No newline at end of file +ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; + +create index t_ds_task_group_queue_in_queue_index on t_ds_task_group_queue (in_queue); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql index ce48a3374ef79..de942c91724cd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -29,4 +29,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1; -ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; \ No newline at end of file +ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; + +create index idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue(in_queue); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql index 6c73e112da326..1eab3db3cd2f3 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -80,4 +80,6 @@ ALTER TABLE `t_ds_process_definition_log` MODIFY COLUMN `version` int NOT NULL D ALTER TABLE `t_ds_process_instance` MODIFY COLUMN `process_definition_version` int NOT NULL DEFAULT 1 COMMENT "process definition version"; ALTER TABLE `t_ds_task_definition` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; ALTER TABLE `t_ds_task_definition_log` MODIFY COLUMN `version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; -ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; \ No newline at end of file +ALTER TABLE `t_ds_task_instance` MODIFY COLUMN `task_definition_version` int NOT NULL DEFAULT 1 COMMENT "task definition version"; + +create index t_ds_task_group_queue_in_queue_index on t_ds_task_group_queue (in_queue); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql index fd13ed3594eeb..1de53d36668bb 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -68,4 +68,6 @@ ALTER TABLE "t_ds_process_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_task_definition_log" ALTER COLUMN "version" SET DEFAULT 1; ALTER TABLE "t_ds_process_instance" ALTER COLUMN "process_definition_version" SET NOT NULL, ALTER COLUMN "process_definition_version" SET DEFAULT 1; -ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; \ No newline at end of file +ALTER TABLE "t_ds_task_instance" ALTER COLUMN "task_definition_version" SET NOT NULL, ALTER COLUMN "task_definition_version" SET DEFAULT 1; + +create index idx_t_ds_task_group_queue_in_queue on t_ds_task_group_queue(in_queue); \ No newline at end of file diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index a4fb547028b53..f85200b7a2c68 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -25,9 +25,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; @RpcService public interface ILogicTaskInstanceOperator { @@ -41,10 +38,4 @@ public interface ILogicTaskInstanceOperator { @RpcMethod LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); - @RpcMethod - TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest); - - @RpcMethod - void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest); - } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java index 217e81322edf0..a535f8c6ca3fc 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IWorkflowInstanceService.java @@ -20,6 +20,8 @@ import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; @RpcService public interface IWorkflowInstanceService { @@ -30,4 +32,7 @@ public interface IWorkflowInstanceService { @RpcMethod WorkflowExecuteDto getWorkflowExecutingData(Integer workflowInstanceId); + @RpcMethod + TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java index 5bfbc03f1ce1f..ddca440436f17 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/TaskInstanceWakeupRequest.java @@ -17,26 +17,21 @@ package org.apache.dolphinscheduler.extract.master.transportor; +import java.io.Serializable; + +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @Data +@Builder @NoArgsConstructor -public class TaskInstanceWakeupRequest { - - private String key; +@AllArgsConstructor +public class TaskInstanceWakeupRequest implements Serializable { private int processInstanceId; private int taskInstanceId; - public TaskInstanceWakeupRequest( - int processInstanceId, - int taskInstanceId) { - this.key = String.format("%d-%d", processInstanceId, taskInstanceId); - - this.processInstanceId = processInstanceId; - this.taskInstanceId = taskInstanceId; - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7d6600cc43390..d37ca9d0167f5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import javax.annotation.PostConstruct; @@ -84,6 +85,9 @@ public class MasterServer implements IStoppable { @Autowired private MasterSlotManager masterSlotManager; + @Autowired + private TaskGroupCoordinator taskGroupCoordinator; + public static void main(String[] args) { MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount); @@ -115,6 +119,7 @@ public void run() throws SchedulerException { this.failoverExecuteThread.start(); this.schedulerApi.start(); + this.taskGroupCoordinator.start(); MasterServerMetrics.registerMasterCpuUsageGauge(() -> { SystemMetrics systemMetrics = metricsProvider.getSystemMetrics(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java index b91da8cc511ca..7cd89b91fa7f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/StateEventHandlerManager.java @@ -24,14 +24,20 @@ import java.util.Optional; import java.util.ServiceLoader; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class StateEventHandlerManager { private static final Map stateEventHandlerMap = new HashMap<>(); static { ServiceLoader.load(StateEventHandler.class) - .forEach(stateEventHandler -> stateEventHandlerMap.put(stateEventHandler.getEventType(), - stateEventHandler)); + .forEach(stateEventHandler -> { + log.info("Initialize StateEventHandler: {} for eventType: {}", + stateEventHandler.getClass().getName(), stateEventHandler.getEventType()); + stateEventHandlerMap.put(stateEventHandler.getEventType(), stateEventHandler); + }); } public static Optional getStateEventHandler(StateEventType stateEventType) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java index f790131a70962..2dde1315de07f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -62,7 +62,8 @@ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, if (task.getState().isFinished() && (taskStateEvent.getStatus() != null && taskStateEvent.getStatus().isRunning())) { String errorMessage = String.format( - "The current task instance state is %s, but the task state event status is %s, so the task state event will be ignored", + "The current TaskInstance: %s state is %s, but the task state event status is %s, so the task state event will be ignored", + task.getName(), task.getState().name(), taskStateEvent.getStatus().name()); log.warn(errorMessage); @@ -75,14 +76,6 @@ public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, return true; } workflowExecuteRunnable.taskFinished(task); - if (task.getTaskGroupId() > 0) { - log.info("The task instance need to release task Group: {}", task.getTaskGroupId()); - try { - workflowExecuteRunnable.releaseTaskGroup(task); - } catch (Exception e) { - throw new StateEventHandleException("Release task group failed", e); - } - } return true; } return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java deleted file mode 100644 index 4859e896b9c94..0000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.event; - -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; - -import lombok.extern.slf4j.Slf4j; - -import com.google.auto.service.AutoService; - -@AutoService(StateEventHandler.class) -@Slf4j -public class TaskWaitTaskGroupStateHandler implements StateEventHandler { - - @Override - public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, - StateEvent stateEvent) { - log.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) { - log.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - } else { - log.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId()); - } - return true; - } - - @Override - public StateEventType getEventType() { - return StateEventType.WAKE_UP_TASK_GROUP; - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index 3a07d683649f0..de4095831e9b5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -32,12 +32,6 @@ public class LogicTaskInstanceOperationFunctionManager { @Autowired private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; - @Autowired - private TaskInstanceForceStartOperationFunction taskInstanceForceStartOperationFunction; - - @Autowired - private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction; - public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; } @@ -50,11 +44,4 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati return logicITaskInstancePauseOperationFunction; } - public TaskInstanceForceStartOperationFunction getTaskInstanceForceStartOperationFunction() { - return taskInstanceForceStartOperationFunction; - } - - public TaskInstanceWakeupOperationFunction getTaskInstanceWakeupOperationFunction() { - return taskInstanceWakeupOperationFunction; - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index c99968d1e9894..39446bd89d267 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -24,9 +24,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import lombok.extern.slf4j.Slf4j; @@ -58,15 +55,4 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ .operate(taskPauseRequest); } - @Override - public TaskInstanceForceStartResponse forceStartTaskInstance(TaskInstanceForceStartRequest taskForceStartRequest) { - return logicTaskInstanceOperationFunctionManager.getTaskInstanceForceStartOperationFunction() - .operate(taskForceStartRequest); - } - - @Override - public void wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) { - logicTaskInstanceOperationFunctionManager.getTaskInstanceWakeupOperationFunction().operate(taskWakeupRequest); - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java deleted file mode 100644 index a15766e4c4e34..0000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceForceStartOperationFunction.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.rpc; - -import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartResponse; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class TaskInstanceForceStartOperationFunction - implements - ITaskInstanceOperationFunction { - - @Autowired - private StateEventResponseService stateEventResponseService; - - @Override - public TaskInstanceForceStartResponse operate(TaskInstanceForceStartRequest taskInstanceForceStartRequest) { - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(taskInstanceForceStartRequest.getProcessInstanceId()) - .taskInstanceId(taskInstanceForceStartRequest.getTaskInstanceId()) - .key(taskInstanceForceStartRequest.getKey()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); - try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); - log.info("Received forceStartTaskInstance, event: {}", stateEvent); - stateEventResponseService.addEvent2WorkflowExecute(stateEvent); - return TaskInstanceForceStartResponse.success(); - } finally { - LogUtils.removeWorkflowAndTaskInstanceIdMDC(); - } - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java index 9a5b716e9077a..4a83874fb9613 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceWakeupOperationFunction.java @@ -17,12 +17,12 @@ package org.apache.dolphinscheduler.server.master.rpc; -import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; -import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import lombok.extern.slf4j.Slf4j; @@ -36,20 +36,30 @@ public class TaskInstanceWakeupOperationFunction ITaskInstanceOperationFunction { @Autowired - private StateEventResponseService stateEventResponseService; + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @Override public TaskInstanceWakeupResponse operate(TaskInstanceWakeupRequest taskInstanceWakeupRequest) { - TaskStateEvent stateEvent = TaskStateEvent.builder() - .processInstanceId(taskInstanceWakeupRequest.getProcessInstanceId()) - .taskInstanceId(taskInstanceWakeupRequest.getTaskInstanceId()) - .key(taskInstanceWakeupRequest.getKey()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), stateEvent.getTaskInstanceId()); - log.info("Received wakeupTaskInstance request, event: {}", stateEvent); - stateEventResponseService.addEvent2WorkflowExecute(stateEvent); + log.info("Received TaskInstanceWakeupRequest request{}", taskInstanceWakeupRequest); + + int workflowInstanceId = taskInstanceWakeupRequest.getProcessInstanceId(); + int taskInstanceId = taskInstanceWakeupRequest.getTaskInstanceId(); + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + WorkflowExecuteRunnable workflowExecuteRunnable = + processInstanceExecCacheManager.getByProcessInstanceId(workflowInstanceId); + if (workflowExecuteRunnable == null) { + log.warn("cannot find WorkflowExecuteRunnable: {}, no need to Wakeup task", workflowInstanceId); + return TaskInstanceWakeupResponse.failed("cannot find WorkflowExecuteRunnable: " + workflowInstanceId); + } + DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = + workflowExecuteRunnable.getTaskExecuteRunnableById(taskInstanceId).orElse(null); + if (defaultTaskExecuteRunnable == null) { + log.warn("Cannot find DefaultTaskExecuteRunnable: {}, cannot Wakeup task", taskInstanceId); + return TaskInstanceWakeupResponse.failed("Cannot find DefaultTaskExecuteRunnable: " + taskInstanceId); + } + defaultTaskExecuteRunnable.dispatch(); + log.info("Success Wakeup TaskInstance: {}", taskInstanceId); return TaskInstanceWakeupResponse.success(); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java index c3420cb1da7ed..d10c8b81c4006 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/WorkflowInstanceServiceImpl.java @@ -19,6 +19,8 @@ import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.service.ExecutingService; @@ -36,6 +38,9 @@ public class WorkflowInstanceServiceImpl implements IWorkflowInstanceService { @Autowired private ExecutingService executingService; + @Autowired + private TaskInstanceWakeupOperationFunction taskInstanceWakeupOperationFunction; + @Override public void clearWorkflowMetrics(Long workflowDefinitionCode) { log.info("Receive clearWorkflowMetrics request: {}", workflowDefinitionCode); @@ -49,4 +54,9 @@ public WorkflowExecuteDto getWorkflowExecutingData(Integer workflowInstanceId) { executingService.queryWorkflowExecutingData(workflowInstanceId); return workflowExecuteDtoOptional.orElse(null); } + + @Override + public TaskInstanceWakeupResponse wakeupTaskInstance(TaskInstanceWakeupRequest taskWakeupRequest) { + return taskInstanceWakeupOperationFunction.operate(taskWakeupRequest); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 3759f2209c2bc..2db49c6f09032 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; @@ -52,14 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; -import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; @@ -81,6 +77,7 @@ import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; @@ -114,7 +111,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import lombok.NonNull; @@ -227,6 +223,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { private final ListenerEventAlertManager listenerEventAlertManager; + private final TaskGroupCoordinator taskGroupCoordinator; + public WorkflowExecuteRunnable( @NonNull IWorkflowExecuteContext workflowExecuteContext, @NonNull CommandService commandService, @@ -238,7 +236,8 @@ public WorkflowExecuteRunnable( @NonNull CuringParamsService curingParamsService, @NonNull TaskInstanceDao taskInstanceDao, @NonNull DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory, - @NonNull ListenerEventAlertManager listenerEventAlertManager) { + @NonNull ListenerEventAlertManager listenerEventAlertManager, + @NonNull TaskGroupCoordinator taskGroupCoordinator) { this.processService = processService; this.commandService = commandService; this.processInstanceDao = processInstanceDao; @@ -250,6 +249,7 @@ public WorkflowExecuteRunnable( this.taskInstanceDao = taskInstanceDao; this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; this.listenerEventAlertManager = listenerEventAlertManager; + this.taskGroupCoordinator = taskGroupCoordinator; TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size); } @@ -339,43 +339,6 @@ public int eventSize() { return this.stateEvents.size(); } - public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { - TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); - if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { - log.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); - TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); - - DefaultTaskExecuteRunnable defaultTaskExecuteRunnable = - taskExecuteRunnableMap.get(taskInstance.getTaskCode()); - if (defaultTaskExecuteRunnable != null) { - defaultTaskExecuteRunnable.dispatch(); - this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); - log.info("Success force start task: {}, taskGroup: {}", taskGroupQueue.getTaskName(), - taskGroupQueue.getGroupId()); - } else { - log.warn("Cannot find the TaskExecuteRunnable: {}", taskGroupQueue.getTaskName()); - } - return true; - } - if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { - log.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); - boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); - if (acquireTaskGroup) { - TaskInstance taskInstance = taskInstanceDao.queryById(stateEvent.getTaskInstanceId()); - taskExecuteRunnableMap.get(taskInstance.getTaskCode()).dispatch(); - log.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); - return true; - } - log.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId()); - return false; - } else { - log.info( - "Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.", - taskGroupQueue); - return true; - } - } public void processStart() { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); @@ -408,6 +371,11 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); stateWheelExecuteThread.removeTask4TimeoutCheck(workflowInstance, taskInstance); stateWheelExecuteThread.removeTask4RetryCheck(workflowInstance, taskInstance); + if (taskInstance.getTaskGroupId() > 0) { + releaseTaskGroupIfNeeded(taskInstance); + log.info("Release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(), + taskInstance.getId()); + } if (taskInstance.getState().isSuccess()) { completeTaskSet.add(taskInstance.getTaskCode()); @@ -462,51 +430,16 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep } } - /** - * release task group - * - */ - public void releaseTaskGroup(TaskInstance taskInstance) { - ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + private void releaseTaskGroupIfNeeded(TaskInstance taskInstance) { // todo: use Integer if (taskInstance.getTaskGroupId() <= 0) { log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup", taskInstance.getName()); return; } - TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance); - if (nextTaskInstance == null) { - log.info( - "The current TaskInstance: {} is the last taskInstance in the taskGroup, no need to wakeup next taskInstance", - taskInstance.getName()); - return; - } - if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { - TaskStateEvent nextEvent = TaskStateEvent.builder() - .processInstanceId(workflowInstance.getId()) - .taskInstanceId(nextTaskInstance.getId()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); - stateEvents.add(nextEvent); - } else { - ProcessInstance processInstance = - processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - if (processInstance == null) { - log.error("WorkflowInstance is null cannot wakeup, processInstanceId:{}", - nextTaskInstance.getProcessInstanceId()); - return; - } - if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) { - log.warn("The next WorkflowInstance: {} host is null no need to wakeup, maybe it is in failover", - processInstance); - return; - } - ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory - .getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class); - taskInstanceOperator.wakeupTaskInstance( - new TaskInstanceWakeupRequest(processInstance.getId(), nextTaskInstance.getId())); - } - log.info("Success send wakeup message to next taskInstance: {}", nextTaskInstance.getId()); + taskGroupCoordinator.releaseTaskGroupSlot(taskInstance); + log.info("Success release task Group slot: {} for taskInstance: {} ", taskInstance.getTaskGroupId(), + taskInstance.getName()); } /** @@ -790,10 +723,11 @@ public void endProcess() { } else { listenerEventAlertManager.publishProcessFailListenerEvent(workflowInstance, projectUser); } - if (checkTaskQueue()) { - // release task group - processService.releaseAllTaskGroup(workflowInstance.getId()); - } + taskInstanceMap.forEach((id, taskInstance) -> { + if (taskInstance != null && taskInstance.getTaskGroupId() > 0) { + releaseTaskGroupIfNeeded(taskInstance); + } + }); // Log the workflowInstance in detail log.info(WorkflowInstanceUtils.logWorkflowInstanceInDetails(workflowInstance)); } @@ -1006,19 +940,11 @@ private boolean executeTask(TaskInstance taskInstance) { // it will be wakeup when other tasks release the resource. int taskGroupId = taskInstance.getTaskGroupId(); if (taskGroupId > 0) { - boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), - taskInstance.getName(), - taskGroupId, - taskInstance.getProcessInstanceId(), - taskInstance.getTaskGroupPriority()); - if (!acquireTaskGroup) { - log.info( - "Submitted task will not be dispatch right now because the first time to try to acquire" - + - " task group failed, taskInstanceName: {}, taskGroupId: {}", - taskInstance.getName(), taskGroupId); - return true; - } + taskGroupCoordinator.acquireTaskGroupSlot(taskInstance); + log.info("The TaskInstance: {} use taskGroup: {} to manage the resource, will wait to notify it", + taskInstance, + taskGroupId); + return true; } // 4. submit to dispatch queue tryToDispatchTaskInstance(taskInstance, taskExecuteRunnable); @@ -1371,7 +1297,7 @@ && tryToTakeOverTaskInstance(existTaskInstance)) { // set the task instance state to fault tolerance existTaskInstance.setFlag(Flag.NO); existTaskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE); - releaseTaskGroup(existTaskInstance); + releaseTaskGroupIfNeeded(existTaskInstance); validTaskMap.remove(existTaskInstance.getTaskCode()); taskInstanceDao.updateById(existTaskInstance); @@ -2090,16 +2016,6 @@ private List getRecoveryNodeCodeList(List recoverNodeList) return recoveryNodeCodeList; } - private boolean checkTaskQueue() { - AtomicBoolean result = new AtomicBoolean(false); - taskInstanceMap.forEach((id, taskInstance) -> { - if (taskInstance != null && taskInstance.getTaskGroupId() > 0) { - result.set(true); - } - }); - return result.get(); - } - private boolean isNewProcessInstance() { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (Flag.YES.equals(workflowInstance.getRecovery())) { @@ -2126,6 +2042,17 @@ public Map getTaskExecuteRunnableMap() { return taskExecuteRunnableMap; } + public Optional getTaskExecuteRunnableById(Integer taskInstanceId) { + if (taskInstanceId == null) { + throw new IllegalArgumentException("taskInstanceId can't be null"); + } + TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); + if (taskInstance == null) { + return Optional.empty(); + } + return Optional.ofNullable(taskExecuteRunnableMap.get(taskInstance.getTaskCode())); + } + public Map getWaitToRetryTaskInstanceMap() { return waitToRetryTaskInstanceMap; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java index fcd2a5f7215b9..7caed6ba757d1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.command.CommandService; @@ -73,6 +74,9 @@ public class WorkflowExecuteRunnableFactory { @Autowired private ListenerEventAlertManager listenerEventAlertManager; + @Autowired + private TaskGroupCoordinator taskGroupCoordinator; + public Optional createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { try { Optional workflowExecuteRunnableContextOptional = @@ -88,9 +92,10 @@ public Optional createWorkflowExecuteRunnable(Command c curingGlobalParamsService, taskInstanceDao, defaultTaskExecuteRunnableFactory, - listenerEventAlertManager)); + listenerEventAlertManager, + taskGroupCoordinator)); } catch (Exception ex) { - throw new WorkflowCreateException("Create workflow execute runnable failed", ex); + throw new WorkflowCreateException("Create WorkflowExecuteRunnable failed", ex); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java new file mode 100644 index 0000000000000..1969f26b6968c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/taskgroup/TaskGroupCoordinator.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.taskgroup; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskGroup; +import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupDao; +import org.apache.dolphinscheduler.dao.repository.TaskGroupQueueDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; +import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupResponse; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.registry.api.RegistryClient; +import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import org.apache.dolphinscheduler.spi.enums.Flag; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * TaskGroupCoordinator use to manage the task group slot. + * The task group slot is used to limit the number of tasks that can be run at the same time. + * The task group slot is managed by TaskGroupCoordinator. + *

+ *

+ *     The TaskGroupQueue is used to represend the task group slot.
+ *     The TaskGroupQueue which inQueue is YES means the TaskGroupQueue is using task slot.
+ * 
+ *

+ * When the task instance need to use task group, we should use @{@link TaskGroupCoordinator#acquireTaskGroupSlot(TaskInstance)} to acquire the task group slot, this method doesn't block. And you should directly stop dispatch the task instance. + * When the task group slot is available, the TaskGroupCoordinator will notify the waiting task instance to dispatch. + *

+ *     if(taskInstance.getTaskGroupId() > 0) {
+ *          taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ *          return;
+ *     }
+ * 
+ *

+ * When the task instance is finished, we should use @{@link TaskGroupCoordinator#releaseTaskGroupSlot(TaskInstance)} to release the task group slot, this method doesn't block. + *

+ *     if(taskInstance.getTaskGroupId() > 0) {
+ *         taskGroupCoordinator.releaseTaskGroupSlot(taskInstance);
+ *     }
+ * 
+ */ +@Slf4j +@Component +public class TaskGroupCoordinator extends BaseDaemonThread { + + @Autowired + private RegistryClient registryClient; + + @Autowired + private TaskGroupDao taskGroupDao; + + @Autowired + private TaskGroupQueueDao taskGroupQueueDao; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private ProcessInstanceDao processInstanceDao; + + public TaskGroupCoordinator() { + super("TaskGroupCoordinator"); + } + + @Override + public void start() { + log.info("TaskGroupCoordinator starting..."); + super.start(); + log.info("TaskGroupCoordinator started..."); + } + + @Override + public void run() { + while (!ServerLifeCycleManager.isStopped()) { + try { + if (!ServerLifeCycleManager.isRunning()) { + continue; + } + try { + registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + // Due to some reason the task group slot may be not released, so we need to amend the task group + // slot. + // Make the taskGroup slot is correct + amendTaskGroupSlot(); + dealWithForceStartTaskGroupQueue(); + dealWithWaitingTaskGroupQueue(); + } finally { + registryClient.releaseLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath()); + } + } catch (Throwable e) { + log.error("TaskGroupCoordinator error", e); + } finally { + // sleep 5s + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5); + } + } + } + + private void amendTaskGroupSlot() { + // The TaskGroup useSize should equal to the TaskGroupQueue which inQueue is YES and forceStart is NO + List taskGroups = taskGroupDao.queryAllTaskGroups(); + if (CollectionUtils.isEmpty(taskGroups)) { + return; + } + for (TaskGroup taskGroup : taskGroups) { + List taskGroupQueues = + taskGroupQueueDao.queryUsingTaskGroupQueueByGroupId(taskGroup.getId()); + int actualUseSize = taskGroupQueues.size(); + if (taskGroup.getUseSize() == actualUseSize) { + continue; + } + log.warn("The TaskGroup: {} useSize is {}, but the actual use size is {}, will amend it", + taskGroup.getName(), + taskGroup.getUseSize(), actualUseSize); + taskGroup.setUseSize(actualUseSize); + taskGroupDao.updateById(taskGroup); + } + } + + private void dealWithForceStartTaskGroupQueue() { + // Find the force start task group queue(Which is inQueue and forceStart is YES) + // Notify the related waiting task instance + // Set the taskGroupQueue status to RELEASE and remove it from queue + List taskGroupQueues = taskGroupQueueDao.queryAllInQueueTaskGroupQueue() + .stream() + .filter(taskGroupQueue -> Flag.YES.getCode() == taskGroupQueue.getForceStart()) + .collect(Collectors.toList()); + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + try { + LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId()); + // notify the waiting task instance + // We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it + // next time. + notifyWaitingTaskInstance(taskGroupQueue); + log.info("Notify the ForceStart waiting TaskInstance: {} for taskGroupQueue: {} success", + taskGroupQueue.getTaskName(), + taskGroupQueue.getId()); + + taskGroupQueue.setInQueue(Flag.NO.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateTaskGroupQueueById(taskGroupQueue); + log.info("Release the force start TaskGroupQueue {}", taskGroupQueue); + } catch (UnsupportedOperationException unsupportedOperationException) { + releaseTaskGroupQueueSlot(taskGroupQueue); + log.info( + "Notify the ForceStart TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", + taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); + } catch (Throwable throwable) { + log.info("Notify the force start TaskGroupQueue {} failed", taskGroupQueue, throwable); + } finally { + LogUtils.removeTaskInstanceIdMDC(); + } + } + } + + private void dealWithWaitingTaskGroupQueue() { + // Find the TaskGroup which usage < maxSize. + // Find the highest priority inQueue task group queue(Which is inQueue and status is Waiting and force start is + // NO) belong to the + // task group. + List taskGroups = taskGroupDao.queryAvailableTaskGroups(); + if (CollectionUtils.isEmpty(taskGroups)) { + log.debug("There is no available task group"); + return; + } + for (TaskGroup taskGroup : taskGroups) { + int availableSize = taskGroup.getGroupSize() - taskGroup.getUseSize(); + if (availableSize <= 0) { + log.info("TaskGroup {} is full, available size is {}", taskGroup, availableSize); + continue; + } + List taskGroupQueues = + taskGroupQueueDao.queryAllInQueueTaskGroupQueueByGroupId(taskGroup.getId()) + .stream() + .filter(taskGroupQueue -> Flag.NO.getCode() == taskGroupQueue.getForceStart()) + .limit(availableSize) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(taskGroupQueues)) { + log.debug("There is no waiting task group queue for task group {}", taskGroup.getName()); + continue; + } + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + try { + LogUtils.setTaskInstanceIdMDC(taskGroupQueue.getTaskId()); + // Reduce the taskGroupSize + boolean acquireResult = taskGroupDao.acquireTaskGroupSlot(taskGroup.getId()); + if (!acquireResult) { + log.error("Failed to acquire task group slot for task group {}", taskGroup); + continue; + } + // Notify the waiting task instance + // We notify first, it notify failed, the taskGroupQueue will be in queue, and then we will retry it + // next time. + notifyWaitingTaskInstance(taskGroupQueue); + + // Set the taskGroupQueue status to RUNNING and remove from queue + taskGroupQueue.setInQueue(Flag.YES.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateTaskGroupQueueById(taskGroupQueue); + } catch (UnsupportedOperationException unsupportedOperationException) { + releaseTaskGroupQueueSlot(taskGroupQueue); + log.info( + "Notify the Waiting TaskInstance: {} for taskGroupQueue: {} failed, will release the taskGroupQueue", + taskGroupQueue.getTaskName(), taskGroupQueue.getId(), unsupportedOperationException); + } catch (Throwable throwable) { + log.error("Notify Waiting TaskGroupQueue: {} failed", taskGroupQueue, throwable); + } finally { + LogUtils.removeTaskInstanceIdMDC(); + } + } + } + } + + /** + * Acquire the task group slot. + *

+ * Will create a TaskGroupQueue in db, and then wait TaskGroupCoordinator to notify it. + * The TaskInstance shouldn't dispatch until the TaskGroupCoordinator notify it. + */ + public void acquireTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance.getTaskGroupId() <= 0) { + throw new IllegalArgumentException("The current task instance does not use task group"); + } + TaskGroup taskGroup = taskGroupDao.queryById(taskInstance.getTaskGroupId()); + if (taskGroup == null) { + log.error("The TaskGroup: {} is not exist, will not use TaskGroup", taskInstance.getTaskGroupId()); + } + // Write TaskGroupQueue in db, and then return wait TaskGroupCoordinator to notify it + // Set the taskGroupQueue status to WAIT_QUEUE and add to queue + // The queue only contains the taskGroupQueue which status is WAIT_QUEUE + Date now = new Date(); + TaskGroupQueue taskGroupQueue = TaskGroupQueue + .builder() + .taskId(taskInstance.getId()) + .taskName(taskInstance.getName()) + .groupId(taskInstance.getTaskGroupId()) + .processId(taskInstance.getProcessInstanceId()) + .priority(taskInstance.getTaskGroupPriority()) + .inQueue(Flag.YES.getCode()) + .forceStart(Flag.NO.getCode()) + .status(TaskGroupQueueStatus.WAIT_QUEUE) + .createTime(now) + .updateTime(now) + .build(); + taskGroupQueueDao.insert(taskGroupQueue); + } + + /** + * Release the task group slot. + *

+ * Only the task group queue which is in queue and forceStart is NO can be released. + */ + public void releaseTaskGroupSlot(TaskInstance taskInstance) { + if (taskInstance.getTaskGroupId() <= 0) { + throw new IllegalArgumentException("The current task instance does not use task group"); + } + List taskGroupQueues = taskGroupQueueDao.queryByTaskInstanceId(taskInstance.getId()); + for (TaskGroupQueue taskGroupQueue : taskGroupQueues) { + releaseTaskGroupQueueSlot(taskGroupQueue); + } + } + + private void notifyWaitingTaskInstance(TaskGroupQueue taskGroupQueue) { + // Find the related waiting task instance + // send RPC to notify the waiting task instance + TaskInstance taskInstance = taskInstanceDao.queryById(taskGroupQueue.getTaskId()); + if (taskInstance == null) { + throw new UnsupportedOperationException( + "The TaskInstance: " + taskGroupQueue.getTaskId() + " is not exist, no need to notify"); + } + // todo: We may need to add a new status to represent the task instance is waiting for task group slot + if (taskInstance.getState() != TaskExecutionStatus.SUBMITTED_SUCCESS) { + throw new UnsupportedOperationException( + "The TaskInstance: " + taskInstance.getId() + " state is " + taskInstance.getState() + + ", no need to notify"); + } + ProcessInstance processInstance = processInstanceDao.queryById(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + throw new UnsupportedOperationException( + "The WorkflowInstance: " + taskInstance.getProcessInstanceId() + + " is not exist, no need to notify"); + } + if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) { + throw new UnsupportedOperationException( + "The WorkflowInstance: " + processInstance.getId() + " state is " + processInstance.getState() + + ", no need to notify"); + } + if (processInstance.getHost() == null || Constants.NULL.equals(processInstance.getHost())) { + throw new UnsupportedOperationException( + "WorkflowInstance host is null, maybe it is in failover: " + processInstance); + } + + TaskInstanceWakeupRequest taskInstanceWakeupRequest = TaskInstanceWakeupRequest.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(taskInstance.getId()) + .build(); + + IWorkflowInstanceService iWorkflowInstanceService = SingletonJdkDynamicRpcClientProxyFactory + .getProxyClient(processInstance.getHost(), IWorkflowInstanceService.class); + TaskInstanceWakeupResponse taskInstanceWakeupResponse = + iWorkflowInstanceService.wakeupTaskInstance(taskInstanceWakeupRequest); + if (!taskInstanceWakeupResponse.isSuccess()) { + throw new UnsupportedOperationException( + "Notify TaskInstance: " + taskInstance.getId() + " failed: " + taskInstanceWakeupResponse); + } + log.info("Wake up TaskInstance: {} success", taskInstance.getName()); + } + + private void releaseTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) { + taskGroupQueue.setInQueue(Flag.NO.getCode()); + taskGroupQueue.setStatus(TaskGroupQueueStatus.RELEASE); + taskGroupQueue.setUpdateTime(new Date()); + taskGroupQueueDao.updateTaskGroupQueueById(taskGroupQueue); + log.info("Success release TaskGroupQueue: {}", taskGroupQueue); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 28c0010e3b01b..c08fb206f4dca 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.taskgroup.TaskGroupCoordinator; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -49,7 +50,6 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.text.ParseException; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -104,6 +104,8 @@ public class WorkflowExecuteRunnableTest { private ListenerEventAlertManager listenerEventAlertManager; + private TaskGroupCoordinator taskGroupCoordinator; + @BeforeEach public void init() throws Exception { applicationContext = Mockito.mock(ApplicationContext.class); @@ -138,6 +140,8 @@ public void init() throws Exception { Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); Mockito.when(workflowGraph.getDag()).thenReturn(new DAG<>()); + taskGroupCoordinator = Mockito.mock(TaskGroupCoordinator.class); + workflowExecuteThread = Mockito.spy( new WorkflowExecuteRunnable( workflowExecuteContext, @@ -150,11 +154,12 @@ public void init() throws Exception { curingGlobalParamsService, taskInstanceDao, defaultTaskExecuteRunnableFactory, - listenerEventAlertManager)); + listenerEventAlertManager, + taskGroupCoordinator)); } @Test - public void testParseStartNodeName() throws ParseException { + public void testParseStartNodeName() { try { Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_START_NODES, "1,2,3"); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java index f5c07e66f65f5..a1f3bb02b77fb 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/enums/RegistryNodeType.java @@ -28,6 +28,7 @@ public enum RegistryNodeType { MASTER("Master", "/nodes/master"), MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"), MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"), + MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"), WORKER("Worker", "/nodes/worker"), ALERT_SERVER("AlertServer", "/nodes/alert-server"), ALERT_LOCK("AlertNodeLock", "/lock/alert"), diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 70d2593a3218e..091d6353fb373 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; -import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -51,7 +50,6 @@ import org.apache.dolphinscheduler.service.model.TaskNode; import java.util.List; -import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -72,8 +70,6 @@ ProcessInstance constructProcessInstance(Command command, ProcessInstance findProcessInstanceById(int processId); - ProcessDefinition findProcessDefineById(int processDefinitionId); - ProcessDefinition findProcessDefinition(Long processDefinitionCode, int processDefinitionVersion); ProcessDefinition findProcessDefinitionByCode(Long processDefinitionCode); @@ -92,9 +88,6 @@ ProcessInstance constructProcessInstance(Command command, void setSubProcessParam(ProcessInstance subProcessInstance); - boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, int commitRetryTimes, - long commitInterval); - @Transactional boolean submitTask(ProcessInstance processInstance, TaskInstance taskInstance); @@ -129,18 +122,10 @@ boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskIn DataSource findDataSourceById(int id); - ProcessInstance findProcessInstanceByTaskId(int taskId); - List queryUdfFunListByIds(Integer[] ids); - List selectAllByProcessDefineCode(long[] codes); - - String queryUserQueueByProcessInstance(ProcessInstance processInstance); - ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId); - List getProjectListHavePerm(int userId); - List listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType); User getUserById(int userId); @@ -175,8 +160,6 @@ int saveTaskRelation(User operator, long projectCode, long processDefinitionCode List transformTask(List taskRelationList, List taskDefinitionLogs); - Map notifyProcessList(int processId); - DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId); int updateDqExecuteResultUserId(int taskInstanceId); @@ -195,16 +178,6 @@ List transformTask(List taskRelationList, DqComparisonType getComparisonTypeById(int id); - boolean acquireTaskGroup(int taskId, - String taskName, int groupId, - int processId, int priority); - - boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue); - - void releaseAllTaskGroup(int processInstanceId); - - TaskInstance releaseTaskGroup(TaskInstance taskInstance); - void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status); TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, @@ -214,12 +187,6 @@ TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId, Integer priority, TaskGroupQueueStatus status); - int updateTaskGroupQueueStatus(Integer taskId, int status); - - int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue); - - TaskGroupQueue loadTaskGroupQueue(int taskId); - ProcessInstance loadNextProcess4Serial(long code, int state, int id); public String findConfigYamlByName(String clusterName); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 706e183cf0b4d..93b257ba27ace 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -67,12 +67,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; -import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; @@ -410,17 +408,6 @@ public ProcessInstance findProcessInstanceById(int processId) { return processInstanceMapper.selectById(processId); } - /** - * find process define by id. - * - * @param processDefinitionId processDefinitionId - * @return process definition - */ - @Override - public ProcessDefinition findProcessDefineById(int processDefinitionId) { - return processDefineMapper.selectById(processDefinitionId); - } - /** * find process define by code and version. * @@ -1080,34 +1067,6 @@ private void initTaskInstance(TaskInstance taskInstance) { taskInstanceDao.updateById(taskInstance); } - /** - * retry submit task to db - */ - @Override - public boolean submitTaskWithRetry(ProcessInstance processInstance, TaskInstance taskInstance, - int commitRetryTimes, long commitInterval) { - int retryTimes = 1; - while (retryTimes <= commitRetryTimes) { - try { - // submit task to db - // Only want to use transaction here - if (submitTask(processInstance, taskInstance)) { - return true; - } - log.error( - "task commit to db failed , taskCode: {} has already retry {} times, please check the database", - taskInstance.getTaskCode(), - retryTimes); - Thread.sleep(commitInterval); - } catch (Exception e) { - log.error("task commit to db failed", e); - } finally { - retryTimes += 1; - } - } - return false; - } - /** * // todo: This method need to refactor, we find when the db down, but the taskInstanceId is not 0. It's better to change to void, rather than return TaskInstance * submit task to db @@ -1260,20 +1219,6 @@ private void initSubInstanceState(ProcessInstance childInstance) { } } - /** - * get sub work flow command type - * child instance exist: child command = fatherCommand - * child instance not exists: child command = fatherCommand[0] - */ - private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) { - CommandType commandType = parentProcessInstance.getCommandType(); - if (childInstance == null) { - String fatherHistoryCommand = parentProcessInstance.getHistoryCmd(); - commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]); - } - return commandType; - } - /** * update sub process definition * @@ -1573,21 +1518,6 @@ public DataSource findDataSourceById(int id) { return dataSourceMapper.selectById(id); } - /** - * find process instance by the task id - * - * @param taskId taskId - * @return process instance - */ - @Override - public ProcessInstance findProcessInstanceByTaskId(int taskId) { - TaskInstance taskInstance = taskInstanceMapper.selectById(taskId); - if (taskInstance != null) { - return processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); - } - return null; - } - /** * find udf function list by id list string * @@ -1599,37 +1529,6 @@ public List queryUdfFunListByIds(Integer[] ids) { return udfFuncMapper.queryUdfByIdStr(ids, null); } - /** - * find schedule list by process define codes. - * - * @param codes codes - * @return schedule list - */ - @Override - public List selectAllByProcessDefineCode(long[] codes) { - return scheduleMapper.selectAllByProcessDefineArray(codes); - } - - /** - * query user queue by process instance - * - * @param processInstance processInstance - * @return queue - */ - @Override - public String queryUserQueueByProcessInstance(ProcessInstance processInstance) { - - String queue = ""; - if (processInstance == null) { - return queue; - } - User executor = userMapper.selectById(processInstance.getExecutorId()); - if (executor != null) { - queue = executor.getQueue(); - } - return queue; - } - /** * query project name and user name by processInstanceId. * @@ -1641,27 +1540,6 @@ public ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId); } - /** - * get have perm project list - * - * @param userId userId - * @return project list - */ - @Override - public List getProjectListHavePerm(int userId) { - List createProjects = projectMapper.queryProjectCreatedByUser(userId); - List authedProjects = projectMapper.queryAuthedProjectListByUserId(userId); - - if (createProjects == null) { - createProjects = new ArrayList<>(); - } - - if (authedProjects != null) { - createProjects.addAll(authedProjects); - } - return createProjects; - } - /** * list unauthorized udf function * @@ -2108,23 +1986,6 @@ public List transformTask(List taskRelationList, return taskNodeList; } - @Override - public Map notifyProcessList(int processId) { - HashMap processTaskMap = new HashMap<>(); - // find sub tasks - ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId); - if (processInstanceMap == null) { - return processTaskMap; - } - ProcessInstance fatherProcess = this.findProcessInstanceById(processInstanceMap.getParentProcessInstanceId()); - TaskInstance fatherTask = taskInstanceDao.queryById(processInstanceMap.getParentTaskInstanceId()); - - if (fatherProcess != null) { - processTaskMap.put(fatherProcess, fatherTask); - } - return processTaskMap; - } - @Override public DqExecuteResult getDqExecuteResultByTaskInstanceId(int taskInstanceId) { return dqExecuteResultMapper.getExecuteResultById(taskInstanceId); @@ -2195,167 +2056,6 @@ public DqComparisonType getComparisonTypeById(int id) { return dqComparisonTypeMapper.selectById(id); } - /** - * the first time (when submit the task ) get the resource of the task group - */ - @Override - public boolean acquireTaskGroup(int taskInstanceId, - String taskName, - int taskGroupId, - int workflowInstanceId, - int taskGroupPriority) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId); - if (taskGroup == null) { - // we don't throw exception here, to avoid the task group has been deleted during workflow running - log.warn("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}", taskGroupId); - return true; - } - // if task group is not applicable - if (taskGroup.getStatus() == Flag.NO.getCode()) { - log.warn("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}", taskGroup.getStatus(), - taskGroupId); - return true; - } - // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS - TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstanceId); - if (taskGroupQueue == null) { - taskGroupQueue = insertIntoTaskGroupQueue( - taskInstanceId, - taskName, - taskGroupId, - workflowInstanceId, - taskGroupPriority, - TaskGroupQueueStatus.WAIT_QUEUE); - log.info("Insert TaskGroupQueue: {} successfully", taskGroupQueue.getId()); - } else { - log.info("The task queue is already exist, taskId: {}", taskInstanceId); - if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { - return true; - } - } - // check if there already exist higher priority tasks - List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks( - taskGroupId, - taskGroupPriority, - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); - if (CollectionUtils.isNotEmpty(highPriorityTasks)) { - return false; - } - // try to get taskGroup - int availableTaskGroupCount = taskGroupMapper.selectAvailableCountById(taskGroupId); - if (availableTaskGroupCount < 1) { - log.info( - "Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}", - taskInstanceId, taskGroupId); - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return false; - } - return robTaskGroupResource(taskGroupQueue); - } - - /** - * try to get the task group resource(when other task release the resource) - */ - @Override - public boolean robTaskGroupResource(TaskGroupQueue taskGroupQueue) { - // set the default max size to avoid dead loop - for (int i = 0; i < 10; i++) { - TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); - if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) { - // remove - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - log.info("The current task Group is full, taskGroup: {}", taskGroup); - return false; - } - int affectedCount = taskGroupMapper.robTaskGroupResource( - taskGroup.getId(), - taskGroup.getUseSize(), - taskGroupQueue.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode()); - if (affectedCount > 0) { - log.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), - taskGroupQueue.getId()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); - this.taskGroupQueueMapper.updateById(taskGroupQueue); - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return true; - } - } - log.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue); - taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return false; - } - - @Override - public void releaseAllTaskGroup(int processInstanceId) { - List taskInstances = this.taskInstanceMapper.loadAllInfosNoRelease(processInstanceId, - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); - for (TaskInstance info : taskInstances) { - releaseTaskGroup(info); - } - } - - /** - * release the TGQ resource when the corresponding task is finished. - * - * @return the result code and msg - */ - @Override - public TaskInstance releaseTaskGroup(TaskInstance taskInstance) { - - TaskGroup taskGroup; - TaskGroupQueue thisTaskGroupQueue; - log.info("Begin to release task group: {}", taskInstance.getTaskGroupId()); - try { - do { - taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); - if (taskGroup == null) { - log.error("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}", - taskInstance.getTaskGroupId()); - return null; - } - thisTaskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { - log.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); - return null; - } - if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) { - log.info("The taskGroupQueue's status is in waiting, will not need to release task group"); - break; - } - } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() - && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), - taskGroup.getUseSize(), - thisTaskGroupQueue.getId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); - } catch (Exception e) { - log.error("release the task group error", e); - return null; - } - log.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId()); - - log.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId()); - changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); - TaskGroupQueue taskGroupQueue; - do { - taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks( - taskGroup.getId(), - TaskGroupQueueStatus.WAIT_QUEUE.getCode(), - Flag.NO.getCode(), - Flag.NO.getCode()); - if (taskGroupQueue == null) { - log.info("There is no taskGroupQueue need to be wakeup taskGroup: {}", taskGroup.getId()); - return null; - } - } while (this.taskGroupQueueMapper.updateInQueueCAS( - Flag.NO.getCode(), - Flag.YES.getCode(), - taskGroupQueue.getId()) != 1); - log.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}", - taskInstance.getTaskGroupId(), taskGroupQueue.getId()); - return taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); - } - /** * release the TGQ resource when the corresponding task is finished. * @@ -2396,21 +2096,6 @@ public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskInstanceId, return taskGroupQueue; } - @Override - public int updateTaskGroupQueueStatus(Integer taskId, int status) { - return taskGroupQueueMapper.updateStatusByTaskId(taskId, status); - } - - @Override - public int updateTaskGroupQueue(TaskGroupQueue taskGroupQueue) { - return taskGroupQueueMapper.updateById(taskGroupQueue); - } - - @Override - public TaskGroupQueue loadTaskGroupQueue(int taskId) { - return this.taskGroupQueueMapper.queryByTaskId(taskId); - } - @Override public ProcessInstance loadNextProcess4Serial(long code, int state, int id) { return this.processInstanceMapper.loadNextProcess4Serial(code, state, id); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 70157f6a0546d..5e20c5ea2b3f8 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -749,22 +749,6 @@ public void testCreateTaskGroupQueue() { Assertions.assertNotNull(taskGroupQueue); } - @Test - public void testDoRelease() { - - TaskGroupQueue taskGroupQueue = getTaskGroupQueue(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setId(1); - taskInstance.setProcessInstanceId(1); - taskInstance.setTaskGroupId(taskGroupQueue.getGroupId()); - - when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue); - when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1); - - processService.releaseTaskGroup(taskInstance); - - } - private TaskGroupQueue getTaskGroupQueue() { TaskGroupQueue taskGroupQueue = new TaskGroupQueue(); taskGroupQueue.setTaskName("task name");