Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev_240307' into dev_240307
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongJinHacker committed Mar 11, 2024
2 parents b40daa9 + 739d72a commit 1fefbf1
Show file tree
Hide file tree
Showing 26 changed files with 692 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class ProjectWorkerGroupController extends BaseController {
@ @RequestParam(value = "workerGroups", required = false) String workerGroups
* @return create result code
*/
@Operation(summary = "assignWorkerGroups", description = "CREATE_PROCESS_DEFINITION_NOTES")
@Operation(summary = "assignWorkerGroups", description = "ASSIGN_WORKER_GROUPS_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456")),
@Parameter(name = "workerGroups", description = "WORKER_GROUP_LIST", schema = @Schema(implementation = List.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,20 @@ public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(va
putMsg(result, Status.SUCCESS);
return result;
}

@Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES")
@Parameters({
@Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)),
@Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")),
})
@GetMapping(value = "/query-dependent-tasks")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKFLOW_LINEAGE_ERROR)
public Result<Map<String, Object>> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam(value = "workFlowCode") Long workFlowCode,
@RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) {
Map<String, Object> result =
workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ public interface WorkFlowLineageService {
*/
Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode);

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode);

/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,7 @@ public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long wor
// do nothing if the workflow is already offline
return;
}

workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinitionDao.updateById(workflowDefinition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -278,11 +279,29 @@ public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitio
public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDefinitionCode) {
Set<TaskMainInfo> taskMainInfos = new HashSet<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode);
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0);
List<TaskMainInfo> taskSubProcess =
workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode);
taskMainInfos.addAll(taskDependents);
taskMainInfos.addAll(taskSubProcess);
return taskMainInfos;
}

/**
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
public Map<String, Object> queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<TaskMainInfo> taskDependents =
workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode,
Objects.isNull(taskCode) ? 0 : taskCode.longValue());
result.put(Constants.DATA_LIST, taskDependents);
putMsg(result, Status.SUCCESS);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,17 @@ public void testQueryWorkFlowLineageByCode() {
Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>());
assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code));
}

@Test
public void testQueryDownstreamDependentTaskList() {
long code = 1L;
long taskCode = 1L;
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode))
.thenReturn(result);

assertDoesNotThrow(
() -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public class TaskMainInfo {
*/
private Date taskUpdateTime;

/**
* projectCode
*/
private long projectCode;

/**
* processDefinitionCode
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ List<TaskMainInfo> queryTaskSubProcessDepOnProcess(@Param("projectCode") long pr
* current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in
* `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`.
*
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return List of TaskMainInfo
*/
List<TaskMainInfo> queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode,
@Param("processDefinitionCode") long processDefinitionCode);
List<TaskMainInfo> queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode,
@Param("taskCode") long taskCode);

/**
* Query all tasks depend on task, only downstream task support currently(from dependent task type).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@
</where>
</select>

<select id="queryTaskDependentDepOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
<select id="queryTaskDependentOnProcess" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.id
, td.name as taskName
, td.code as taskCode
, td.version as taskVersion
, td.task_type as taskType
, pd.project_code as projectCode
, ptr.process_definition_code as processDefinitionCode
, pd.name as processDefinitionName
, pd.version as processDefinitionVersion
Expand All @@ -205,16 +206,16 @@
join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version
join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version
<where>
<if test="projectCode != 0">
and ptr.project_code = #{projectCode}
</if>
<!-- ptr.process_definition_code != #{processDefinitionCode} query task not in current workflow -->
<!-- For dependnet task type, using `like concat('%"definitionCode":', #{processDefinitionCode}, '%')` -->
<if test="processDefinitionCode != 0">
and td.task_type = 'DEPENDENT'
and ptr.process_definition_code != #{processDefinitionCode}
and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%')
</if>
<if test="taskCode != 0">
and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%'))
</if>
</where>
</select>

Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/en_US/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ export default {
confirm_to_offline: 'Confirm to make the workflow offline?',
time_to_online: 'Confirm to make the Scheduler online?',
time_to_offline: 'Confirm to make the Scheduler offline?',
warning_dependent_tasks_title: 'Warning',
warning_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the workflow offline?',
warning_dependencies: 'Dependencies:',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the workflow.',
warning_offline_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the scheduler offline?',
delete_task_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
warning_delete_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to delete the scheduler?',
},
task: {
on_line: 'Online',
Expand Down Expand Up @@ -306,7 +313,8 @@ export default {
startup_parameter: 'Startup Parameter',
whether_dry_run: 'Whether Dry-Run',
please_choose: 'Please Choose',
remove_task_cache: 'Clear cache'
remove_task_cache: 'Clear cache',
delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.',
},
dag: {
create: 'Create Workflow',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/locales/zh_CN/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ export default {
confirm_to_offline: '是否确定下线该工作流?',
time_to_online: '是否确定上线该定时?',
time_to_offline: '是否确定下线该定时?',
warning_dependent_tasks_title: '警告',
warning_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该工作流嘛?',
warning_dependencies: '依赖如下:',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该工作流',
warning_offline_scheduler_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该定时嘛?',
delete_task_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务.',
warning_delete_scheduler_dependent_tasks_desc: '下游存在依赖, 删除定时可能会对下游任务产生影响. 你确定要删除该定时嘛?',
},
task: {
on_line: '线上',
Expand Down Expand Up @@ -304,7 +311,8 @@ export default {
startup_parameter: '启动参数',
whether_dry_run: '是否空跑',
please_choose: '请选择',
remove_task_cache: '清除缓存'
remove_task_cache: '清除缓存',
delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务定义',
},
dag: {
create: '创建工作流',
Expand Down
10 changes: 9 additions & 1 deletion dolphinscheduler-ui/src/service/modules/lineages/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

import { axios } from '@/service/service'
import { ProjectCodeReq, WorkflowCodeReq } from './types'
import {DependentTaskReq, ProjectCodeReq, WorkflowCodeReq} from './types'

export function queryWorkFlowList(projectCode: ProjectCodeReq): any {
return axios({
Expand All @@ -41,3 +41,11 @@ export function queryLineageByWorkFlowCode(
method: 'get'
})
}

export function queryDependentTasks(projectCode: number, params: DependentTaskReq): any {
return axios({
url: `/projects/${projectCode}/lineages/query-dependent-tasks`,
method: 'get',
params
})
}
5 changes: 5 additions & 0 deletions dolphinscheduler-ui/src/service/modules/lineages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ interface WorkflowRes {
workFlowRelationList: WorkFlowRelationList[]
}

interface DependentTaskReq extends WorkflowCodeReq {
taskCode?: number
}

export {
ProjectCodeReq,
WorkflowCodeReq,
WorkFlowNameReq,
DependentTaskReq,
WorkflowRes,
WorkFlowListRes
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
defineComponent,
PropType,
h,
ref, watch
} from 'vue'
import { useI18n } from 'vue-i18n'
import {NEllipsis, NModal, NSpace} from 'naive-ui'
import {IDefinitionData} from "@/views/projects/workflow/definition/types";
import ButtonLink from "@/components/button-link";

const props = {
row: {
type: Object as PropType<IDefinitionData>,
default: {},
required: false
},
show: {
type: Boolean as PropType<boolean>,
default: false
},
required: {
type: Boolean as PropType<boolean>,
default: true
},
taskLinks: {
type: Array,
default: []
},
content: {
type: String,
default: ''
}
}

export default defineComponent({
name: 'dependenciesConfirm',
props,
emits: ['update:show', 'update:row', 'confirm'],
setup(props, ctx) {
const { t } = useI18n()

const showRef = ref(props.show)

const confirmToHandle = () => {
ctx.emit('confirm')
}

const cancelToHandle = () => {
ctx.emit('update:show', showRef)
}

const renderDownstreamDependencies = () => {
return h(
<NSpace vertical>
<div>{props.content}</div>
<div>{t('project.workflow.warning_dependencies')}</div>
{props.taskLinks.map((item: any) => {
return (
<ButtonLink
onClick={item.action}
disabled={false}
>
{{
default: () =>
h(NEllipsis,
{
style: 'max-width: 350px;line-height: 1.5'
},
() => item.text
)
}}
</ButtonLink>
)
})}
</NSpace>
)
}

watch(()=> props.show,
() => {
showRef.value = props.show
})

return {renderDownstreamDependencies, confirmToHandle, cancelToHandle, showRef}
},

render() {
const { t } = useI18n()

return (
<NModal
v-model:show={this.showRef}
preset={'dialog'}
type={this.$props.required? 'error':'warning'}
title={t('project.workflow.warning_dependent_tasks_title')}
positiveText={this.$props.required? '':t('project.workflow.confirm')}
negativeText={t('project.workflow.cancel')}
maskClosable={false}
onNegativeClick={this.cancelToHandle}
onPositiveClick={this.confirmToHandle}
onClose={this.cancelToHandle}
>
{{
default: () => (
this.renderDownstreamDependencies()
)
}}
</NModal>
)
}
})
Loading

0 comments on commit 1fefbf1

Please sign in to comment.