Skip to content

Commit

Permalink
Merge branch 'audit-log' of https://github.com/qingwli/dolphinscheduler
Browse files Browse the repository at this point in the history
… into audit-log
  • Loading branch information
qingwli committed Mar 11, 2024
2 parents 963825b + 066c0ec commit 3ee0fcd
Show file tree
Hide file tree
Showing 30 changed files with 777 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ProjectWorkerGroupController extends BaseController {
@ @RequestParam(value = "workerGroups", required = false) String workerGroups
* @return create result code
*/
@Operation(summary = "assignWorkerGroups", description = "CREATE_PROCESS_DEFINITION_NOTES")
@Operation(summary = "assignWorkerGroups", description = "ASSIGN_WORKER_GROUPS_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456")),
@Parameter(name = "workerGroups", description = "WORKER_GROUP_LIST", schema = @Schema(implementation = List.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,20 @@ public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(va
putMsg(result, Status.SUCCESS);
return result;
}

@Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES")
@Parameters({
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")),
})
@GetMapping(value = "/query-dependent-tasks")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam(value = "workFlowCode") Long workFlowCode,
@RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) {
Map<String, Object> result =
workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public interface WorkFlowLineageService {
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode);

/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,7 @@ public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long wor
// do nothing if the workflow is already offline
return;
}

workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinitionDao.updateById(workflowDefinition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -278,11 +279,29 @@ public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitio
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode);
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0);
List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode,
Objects.isNull(taskCode) ? 0 : taskCode.longValue());
result.put(Constants.DATA_LIST, taskDependents);
putMsg(result, Status.SUCCESS);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ public void testQueryWorkFlowLineageByCode() {
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code));
}

@Test
public void testQueryDownstreamDependentTaskList() {
long code = 1L;
long taskCode = 1L;
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode))
.thenReturn(result);

assertDoesNotThrow(
() -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -494,9 +495,8 @@ public void testUpdateResourceContent() throws Exception {
ServiceException serviceException =
Assertions.assertThrows(ServiceException.class, () -> resourcesService.updateResourceContent(getUser(),
"/dolphinscheduler/123/resources/ResourcesServiceTest.jar", "123", "content"));
assertEquals(
"Internal Server Error: Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal",
serviceException.getMessage());
assertTrue(serviceException.getMessage()
.contains("Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal"));

// RESOURCE_NOT_EXIST
when(storageOperate.getResDir(Mockito.anyString())).thenReturn("/dolphinscheduler/123/resources");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class TaskMainInfo {
*/
private Date taskUpdateTime;

/**
* projectCode
*/
private long projectCode;

/**
* processDefinitionCode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long pr
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
List<TaskMainInfo> queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);

/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@
</where>
</select>

<select id="queryTaskDependentDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
<select id="queryTaskDependentOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, pd.project_code as projectCode
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
Expand All @@ -205,16 +206,16 @@
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%'))
</if>
</where>
</select>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DependentItem {
private String dateValue;
private DependResult dependResult;
private TaskExecutionStatus status;
private Boolean parameterPassing;
private Boolean parameterPassing = false;

public String getKey() {
return String.format("%d-%d-%s-%s",
Expand Down
Loading

0 comments on commit 3ee0fcd

Please sign in to comment.