Skip to content

Commit

Permalink
Fix TaskGroupQueue will never be wakeup due to wakeup failed at one time
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 28, 2024
1 parent e20e066 commit 38ff493
Show file tree
Hide file tree
Showing 40 changed files with 807 additions and 819 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
* @param pageSize page size
* @return queue list
*/
@Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES")
@Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES")
@Parameters({
@Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")),
@Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")),
Expand All @@ -310,15 +310,21 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status,
groupId, pageNo, pageSize);
public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(
loginUser,
taskName,
processName,
status,
groupId,
pageNo,
pageSize);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ Map<String, Object> createTaskGroup(User loginUser, Long projectCode, String nam
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);

/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);

/**
* query all task group by user id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
Expand Down Expand Up @@ -83,14 +82,12 @@
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.service.command.CommandService;
Expand Down Expand Up @@ -552,16 +549,20 @@ public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId())
.orElseThrow(
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()));
checkMasterExists();

if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueMapper.updateById(taskGroupQueue);

checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}

public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
Expand Down Expand Up @@ -664,32 +665,6 @@ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance process
return result;
}

/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}

taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.", processInstance.getHost());
ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId()));
putMsg(result, Status.SUCCESS);
return result;
}

/**
* check whether sub processes are offline before starting process definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status,
int groupId, int pageNo, int pageSize) {
public Map<String, Object> queryTasksByGroupId(User loginUser,
String taskName,
String processName,
Integer status,
int groupId,
int pageNo,
int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
Expand All @@ -79,8 +84,13 @@ public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName,
return result;
}
List<Project> projects = projectMapper.selectBatchIds(projectIds);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page,
taskName, processName, status, groupId, projects);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(
page,
taskName,
processName,
status,
groupId,
projects);

pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,6 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
return result;
}

/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}

/**
* query all task group by user id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,6 @@
*/
public interface TaskGroupMapper extends BaseMapper<TaskGroup> {

int robTaskGroupResource(@Param("id") int id,
@Param("currentUseSize") int currentUseSize,
@Param("queueId") int queueId,
@Param("queueStatus") int queueStatus);

/**
* update table of task group
*
* @param id primary key
* @return affected rows
*/
int releaseTaskGroupResource(@Param("id") int id, @Param("useSize") int useSize,
@Param("queueId") int queueId, @Param("queueStatus") int queueStatus);

/**
* select task groups paging
*
Expand All @@ -69,13 +55,6 @@ IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("name") Stri
*/
TaskGroup queryByName(@Param("userId") int userId, @Param("name") String name);

/**
* Select the groupSize > useSize Count
*/
int selectAvailableCountById(@Param("groupId") int groupId);

int selectCountByIdStatus(@Param("id") int id, @Param("status") int status);

IPage<TaskGroup> queryTaskGroupPagingByProjectCode(Page<TaskGroup> page, @Param("projectCode") Long projectCode);

/**
Expand All @@ -87,4 +66,12 @@ IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("name") Stri
List<TaskGroup> listAuthorizedResource(@Param("userId") int userId);

List<TaskGroup> selectByProjectCode(@Param("projectCode") long projectCode);

List<TaskGroup> queryAvailableTaskGroups();

List<TaskGroup> queryUsedTaskGroups();

int acquireTaskGroupSlot(@Param("id") Integer id);

int releaseTaskGroupSlot(@Param("id") Integer id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,22 @@ IPage<TaskGroupQueue> queryTaskGroupQueueByTaskGroupIdPaging(Page<TaskGroupQueue

void deleteByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);

void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") List<Integer> workflowInstanceIds);

void deleteByTaskGroupIds(@Param("taskGroupIds") List<Integer> taskGroupIds);

void updateTaskGroupPriorityByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId,
@Param("priority") int taskGroupPriority);

List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
@Param("inQueue") int inQueue);

List<TaskGroupQueue> queryAllInQueueTaskGroupQueue();

List<TaskGroupQueue> queryByTaskInstanceId(@Param("taskInstanceId") Integer taskInstanceId);

List<TaskGroupQueue> queryUsingTaskGroupQueueByGroupId(@Param("taskGroupId") Integer taskGroupId,
@Param("status") int status,
@Param("inQueue") int inQueue,
@Param("forceStart") int forceStart);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.master.transportor;
package org.apache.dolphinscheduler.dao.repository;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskInstanceForceStartResponse {
import java.util.List;

private boolean success;
public interface TaskGroupDao extends IDao<TaskGroup> {

private String message;
List<TaskGroup> queryAllTaskGroups();

public static TaskInstanceForceStartResponse success() {
return new TaskInstanceForceStartResponse(true, "dispatch success");
}
/**
* Query the TaskGroups which useSize > 0
*/
List<TaskGroup> queryUsedTaskGroups();

public static TaskInstanceForceStartResponse failed(String message) {
return new TaskInstanceForceStartResponse(false, message);
}
/**
* Query the TaskGroups which useSize < groupSize
*/
List<TaskGroup> queryAvailableTaskGroups();

boolean acquireTaskGroupSlot(Integer taskGroupId);

boolean releaseTaskGroupSlot(Integer taskGroupId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,26 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.master.transportor;
package org.apache.dolphinscheduler.dao.repository;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;

@Data
@NoArgsConstructor
public class TaskInstanceForceStartRequest {
import java.util.List;

private String key;
public interface TaskGroupQueueDao extends IDao<TaskGroupQueue> {

private int processInstanceId;
void deleteByWorkflowInstanceIds(List<Integer> workflowInstanceIds);

private int taskInstanceId;
List<TaskGroupQueue> queryAllInQueueTaskGroupQueue();

public TaskInstanceForceStartRequest(
int processInstanceId,
int taskInstanceId) {
this.key = String.format("%d-%d", processInstanceId, taskInstanceId);
List<TaskGroupQueue> queryAllInQueueTaskGroupQueueByGroupId(Integer taskGroupId);

this.processInstanceId = processInstanceId;
this.taskInstanceId = taskInstanceId;
}
void updateTaskGroupQueueById(TaskGroupQueue taskGroupQueue);

List<TaskGroupQueue> queryByTaskInstanceId(Integer taskInstanceId);

/**
* Return the {@link TaskGroupQueue} which is inQueue and forceStart is no.
*/
List<TaskGroupQueue> queryUsingTaskGroupQueueByGroupId(Integer taskGroupId);
}
Loading

0 comments on commit 38ff493

Please sign in to comment.