Skip to content

Commit

Permalink
[Improvement-16887][Dependent Task] Dependent task improvement (#16910)
Browse files Browse the repository at this point in the history
  • Loading branch information
SbloodyS authored Jan 2, 2025
1 parent 9237aa8 commit 50923e7
Show file tree
Hide file tree
Showing 35 changed files with 886 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
Expand All @@ -53,11 +54,14 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.AbstractTaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentDetails;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand All @@ -70,6 +74,8 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
Expand All @@ -96,6 +102,7 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -174,6 +181,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
@Autowired
private CuringParamsService curingGlobalParamsService;

@Autowired
private TaskInstanceContextDao taskInstanceContextDao;

/**
* return top n SUCCESS workflow instance order by running time which started between startTime and endTime
*/
Expand All @@ -184,7 +194,7 @@ public Map<String, Object> queryTopNLongestRunningWorkflowInstance(User loginUse
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -233,7 +243,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand All @@ -245,7 +255,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
workflowInstance.getWorkflowDefinitionVersion());

if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) {
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
log.error("workflow definition does not exist, projectCode: {}.", projectCode);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId);
} else {
workflowInstance.setLocations(workflowDefinition.getLocations());
Expand Down Expand Up @@ -443,7 +453,7 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand All @@ -460,15 +470,47 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
List<TaskInstance> taskInstanceList =
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId,
workflowInstance.getTestFlag());
List<TaskInstanceDependentDetails<ITaskInstanceContext>> taskInstanceDependentDetailsList =
setTaskInstanceDependentResult(taskInstanceList);

Map<String, Object> resultMap = new HashMap<>();
resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
resultMap.put(TASK_LIST, taskInstanceDependentDetailsList);
result.put(DATA_LIST, resultMap);

putMsg(result, Status.SUCCESS);
return result;
}

private List<TaskInstanceDependentDetails<ITaskInstanceContext>> setTaskInstanceDependentResult(List<TaskInstance> taskInstanceList) {
List<TaskInstanceDependentDetails<ITaskInstanceContext>> taskInstanceDependentDetailsList =
taskInstanceList.stream()
.map(taskInstance -> {
TaskInstanceDependentDetails<ITaskInstanceContext> taskInstanceDependentDetails =
new TaskInstanceDependentDetails<>();
BeanUtils.copyProperties(taskInstance, taskInstanceDependentDetails);
return taskInstanceDependentDetails;
}).collect(Collectors.toList());
List<Integer> taskInstanceIdList = taskInstanceList.stream()
.map(TaskInstance::getId).collect(Collectors.toList());
List<TaskInstanceContext> taskInstanceContextList =
taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT_CONTEXT);
for (TaskInstanceContext taskInstanceContext : taskInstanceContextList) {
for (AbstractTaskInstanceContext dependentResultTaskInstanceContext : taskInstanceContext
.getTaskInstanceContext()) {
for (TaskInstanceDependentDetails<ITaskInstanceContext> taskInstanceDependentDetails : taskInstanceDependentDetailsList) {
if (taskInstanceDependentDetails.getId().equals(taskInstanceContext.getTaskInstanceId())) {
taskInstanceDependentDetails
.setTaskInstanceDependentResult(
dependentResultTaskInstanceContext);
}
}
}
}
return taskInstanceDependentDetailsList;
}

@Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
Expand All @@ -488,7 +530,7 @@ public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUs
.queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(),
taskInstance.getTaskCode());
List<Long> allSubWorkflowInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
List<WorkflowInstance> allSubWorkflows = workflowInstanceDao.queryByIds(allSubWorkflowInstanceId);

if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
Expand Down Expand Up @@ -539,7 +581,7 @@ public Map<String, Object> querySubWorkflowInstanceByTaskId(User loginUser, long
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -693,7 +735,7 @@ public Map<String, Object> updateWorkflowInstance(User loginUser, long projectCo
"Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
projectCode, workflowDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, workflowDefinition);
result.put(DATA_LIST, workflowDefinition);
} else {
log.info(
"Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
Expand Down Expand Up @@ -750,7 +792,7 @@ public Map<String, Object> queryParentInstanceBySubId(User loginUser, long proje
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -824,7 +866,7 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
if (workflowInstance == null) {
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
workflowInstanceId);
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
return result;
}

Expand Down Expand Up @@ -918,7 +960,7 @@ public Map<String, Object> viewGantt(long projectCode, Integer workflowInstanceI
if (workflowInstance == null) {
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
workflowInstanceId);
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
Expand All @@ -42,10 +43,12 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
Expand All @@ -60,10 +63,12 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
Expand All @@ -90,6 +95,7 @@
import org.mockito.quality.Strictness;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -154,6 +160,9 @@ public class WorkflowInstanceServiceTest {
@Mock
private WorkflowInstanceMapDao workflowInstanceMapDao;

@Mock
private TaskInstanceContextDao taskInstanceContextDao;

private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
Expand Down Expand Up @@ -465,6 +474,18 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
taskInstance.setTaskType("SHELL");
List<TaskInstance> taskInstanceList = new ArrayList<>();
taskInstanceList.add(taskInstance);
List<DependentResultTaskInstanceContext> dependentResultTaskInstanceContextList = new ArrayList<>();
TaskInstanceContext taskInstanceContext = new TaskInstanceContext();
taskInstanceContext.setTaskInstanceId(0);
taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT);
DependentResultTaskInstanceContext dependentResultTaskInstanceContext =
new DependentResultTaskInstanceContext();
dependentResultTaskInstanceContext.setProjectCode(projectCode);
dependentResultTaskInstanceContext.setDependentResult(DependResult.SUCCESS);
taskInstanceContext.setTaskInstanceContext(
Lists.asList(dependentResultTaskInstanceContext, new DependentResultTaskInstanceContext[0]));
List<Integer> taskInstanceIdList = new ArrayList<>();
taskInstanceIdList.add(0);
Result res = new Result();
res.setCode(Status.SUCCESS.ordinal());
res.setData("xxx");
Expand All @@ -476,6 +497,9 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
workflowInstance.getTestFlag()))
.thenReturn(taskInstanceList);
when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res);
when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT_CONTEXT))
.thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0]));
Map<String, Object> successRes =
workflowInstanceService.queryTaskListByWorkflowInstanceId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ public final class Constants {
public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId";
public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String DEPENDENCE = "dependence";
public static final String TASK_LIST = "taskList";
public static final String QUEUE = "queue";
public static final String QUEUE_NAME = "queueName";
public static final String DEPENDENT_SPLIT = ":||";

/**
* dependent task
*/
public static final long DEPENDENT_ALL_TASK_CODE = -1;
public static final long DEPENDENT_WORKFLOW_CODE = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.enums;

import lombok.Getter;

@Getter
public enum ContextType {

DEPENDENT_RESULT_CONTEXT;

public static ContextType of(String name) {
for (ContextType contextType : values()) {
if (contextType.name().equalsIgnoreCase(name)) {
return contextType;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.dao.model.ITaskInstanceContext;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public abstract class AbstractTaskInstanceContext implements ITaskInstanceContext {

protected ContextType contextType;
}
Loading

0 comments on commit 50923e7

Please sign in to comment.