diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java index a9ac07479801..24bd3eddd98b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectWorkerGroupController.java @@ -68,7 +68,7 @@ public class ProjectWorkerGroupController extends BaseController { @ @RequestParam(value = "workerGroups", required = false) String workerGroups * @return create result code */ - @Operation(summary = "assignWorkerGroups", description = "CREATE_PROCESS_DEFINITION_NOTES") + @Operation(summary = "assignWorkerGroups", description = "ASSIGN_WORKER_GROUPS_NOTES") @Parameters({ @Parameter(name = "projectCode", description = "PROJECT_CODE", schema = @Schema(implementation = long.class, example = "123456")), @Parameter(name = "workerGroups", description = "WORKER_GROUP_LIST", schema = @Schema(implementation = List.class)) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java index d37928610e4f..6a6b110d6ea3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageController.java @@ -135,4 +135,20 @@ public Result verifyTaskCanDelete(@Parameter(hidden = true) @RequestAttribute(va putMsg(result, Status.SUCCESS); return result; } + + @Operation(summary = "queryDownstreamDependentTaskList", description = "QUERY_DOWNSTREAM_DEPENDENT_TASK_NOTES") + @Parameters({ + @Parameter(name = "workFlowCode", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = Long.class)), + @Parameter(name = "taskCode", description = "TASK_DEFINITION_CODE", required = false, schema = @Schema(implementation = Long.class, example = "123456789")), + }) + @GetMapping(value = "/query-dependent-tasks") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_WORKFLOW_LINEAGE_ERROR) + public Result> queryDownstreamDependentTaskList(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, + @RequestParam(value = "workFlowCode") Long workFlowCode, + @RequestParam(value = "taskCode", required = false, defaultValue = "0") Long taskCode) { + Map result = + workFlowLineageService.queryDownstreamDependentTasks(workFlowCode, taskCode); + return returnDataList(result); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java index 5b365553609e..2e535f330704 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java @@ -45,6 +45,15 @@ public interface WorkFlowLineageService { */ Set queryTaskDepOnProcess(long projectCode, long processDefinitionCode); + /** + * Query downstream tasks depend on a process definition or a task + * + * @param processDefinitionCode Process definition code want to query tasks dependence + * @param taskCode Task code want to query tasks dependence + * @return downstream dependent tasks + */ + Map queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode); + /** * Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method. * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5624f4de4fad..173268c53ac6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -2537,6 +2537,7 @@ public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long wor // do nothing if the workflow is already offline return; } + workflowDefinition.setReleaseState(ReleaseState.OFFLINE); processDefinitionDao.updateById(workflowDefinition); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java index 15f39d696f9a..014d22af5724 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -278,11 +279,29 @@ public Optional taskDepOnTaskMsg(long projectCode, long processDefinitio public Set queryTaskDepOnProcess(long projectCode, long processDefinitionCode) { Set taskMainInfos = new HashSet<>(); List taskDependents = - workFlowLineageMapper.queryTaskDependentDepOnProcess(projectCode, processDefinitionCode); + workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, 0); List taskSubProcess = workFlowLineageMapper.queryTaskSubProcessDepOnProcess(projectCode, processDefinitionCode); taskMainInfos.addAll(taskDependents); taskMainInfos.addAll(taskSubProcess); return taskMainInfos; } + + /** + * Query downstream tasks depend on a process definition or a task + * + * @param processDefinitionCode Process definition code want to query tasks dependence + * @param taskCode Task code want to query tasks dependence + * @return downstream dependent tasks + */ + @Override + public Map queryDownstreamDependentTasks(Long processDefinitionCode, Long taskCode) { + Map result = new HashMap<>(); + List taskDependents = + workFlowLineageMapper.queryTaskDependentOnProcess(processDefinitionCode, + Objects.isNull(taskCode) ? 0 : taskCode.longValue()); + result.put(Constants.DATA_LIST, taskDependents); + putMsg(result, Status.SUCCESS); + return result; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java index b0f7bbc362fa..6329cc584adf 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkFlowLineageControllerTest.java @@ -86,4 +86,17 @@ public void testQueryWorkFlowLineageByCode() { Mockito.when(workFlowLineageService.queryWorkFlowLineageByCode(projectCode, code)).thenReturn(new HashMap<>()); assertDoesNotThrow(() -> workFlowLineageController.queryWorkFlowLineageByCode(user, projectCode, code)); } + + @Test + public void testQueryDownstreamDependentTaskList() { + long code = 1L; + long taskCode = 1L; + Map result = new HashMap<>(); + result.put(Constants.STATUS, Status.SUCCESS); + Mockito.when(workFlowLineageService.queryDownstreamDependentTasks(code, taskCode)) + .thenReturn(result); + + assertDoesNotThrow( + () -> workFlowLineageController.queryDownstreamDependentTaskList(user, code, taskCode)); + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index fc1766214796..a01ce75a4c2b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -494,9 +495,8 @@ public void testUpdateResourceContent() throws Exception { ServiceException serviceException = Assertions.assertThrows(ServiceException.class, () -> resourcesService.updateResourceContent(getUser(), "/dolphinscheduler/123/resources/ResourcesServiceTest.jar", "123", "content")); - assertEquals( - "Internal Server Error: Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal", - serviceException.getMessage()); + assertTrue(serviceException.getMessage() + .contains("Resource file: /dolphinscheduler/123/resources/ResourcesServiceTest.jar is illegal")); // RESOURCE_NOT_EXIST when(storageOperate.getResDir(Mockito.anyString())).thenReturn("/dolphinscheduler/123/resources"); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java index 7b5492b75b21..16b5887834ad 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskMainInfo.java @@ -62,6 +62,11 @@ public class TaskMainInfo { */ private Date taskUpdateTime; + /** + * projectCode + */ + private long projectCode; + /** * processDefinitionCode */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java index b47731c5915f..649e188e9c8a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.java @@ -124,12 +124,12 @@ List queryTaskSubProcessDepOnProcess(@Param("projectCode") long pr * current method `queryTaskDepOnProcess`. Which mean with the same parameter processDefinitionCode, all tasks in * `queryTaskDepOnTask` are in the result of method `queryTaskDepOnProcess`. * - * @param projectCode Project code want to query tasks dependence * @param processDefinitionCode Process definition code want to query tasks dependence + * @param taskCode Task code want to query tasks dependence * @return List of TaskMainInfo */ - List queryTaskDependentDepOnProcess(@Param("projectCode") long projectCode, - @Param("processDefinitionCode") long processDefinitionCode); + List queryTaskDependentOnProcess(@Param("processDefinitionCode") long processDefinitionCode, + @Param("taskCode") long taskCode); /** * Query all tasks depend on task, only downstream task support currently(from dependent task type). diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml index 73ecd12410d4..51c60394be36 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkFlowLineageMapper.xml @@ -191,12 +191,13 @@ - select td.id , td.name as taskName , td.code as taskCode , td.version as taskVersion , td.task_type as taskType + , pd.project_code as projectCode , ptr.process_definition_code as processDefinitionCode , pd.name as processDefinitionName , pd.version as processDefinitionVersion @@ -205,9 +206,6 @@ join t_ds_process_task_relation ptr on ptr.post_task_code = td.code and td.version = ptr.post_task_version join t_ds_process_definition pd on pd.code = ptr.process_definition_code and pd.version = ptr.process_definition_version - - and ptr.project_code = #{projectCode} - @@ -215,6 +213,9 @@ and ptr.process_definition_code != #{processDefinitionCode} and td.task_params like concat('%"definitionCode":', #{processDefinitionCode}, '%') + + and (td.task_params like concat('%"depTaskCode":', #{taskCode}, '%') or td.task_params like concat('%"depTaskCode":-1%')) + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java index 360fac42b61c..47f42c416c20 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentItem.java @@ -35,7 +35,7 @@ public class DependentItem { private String dateValue; private DependResult dependResult; private TaskExecutionStatus status; - private Boolean parameterPassing; + private Boolean parameterPassing = false; public String getKey() { return String.format("%d-%d-%s-%s", diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/test/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/test/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTaskTest.java index 9f741e036006..11625ee3996b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/test/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/test/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTaskTest.java @@ -20,8 +20,6 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -33,6 +31,10 @@ import software.amazon.awssdk.services.datasync.model.CancelTaskExecutionResponse; import software.amazon.awssdk.services.datasync.model.CreateTaskRequest; import software.amazon.awssdk.services.datasync.model.CreateTaskResponse; +import software.amazon.awssdk.services.datasync.model.DescribeTaskExecutionRequest; +import software.amazon.awssdk.services.datasync.model.DescribeTaskExecutionResponse; +import software.amazon.awssdk.services.datasync.model.DescribeTaskRequest; +import software.amazon.awssdk.services.datasync.model.DescribeTaskResponse; import software.amazon.awssdk.services.datasync.model.StartTaskExecutionRequest; import software.amazon.awssdk.services.datasync.model.StartTaskExecutionResponse; import software.amazon.awssdk.services.datasync.model.TaskExecutionStatus; @@ -42,9 +44,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.MockedStatic; -import org.mockito.Mockito; +import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) @@ -56,26 +58,22 @@ public class DatasyncTaskTest { private static final String mockTaskArn = "arn:aws:datasync:ap-northeast-3:523202806641:task/task-071ca64ff4c2f0d4a"; - DatasyncHook datasyncHook; - + @InjectMocks + @Spy DatasyncTask datasyncTask; + @Mock + TaskExecutionContext taskExecutionContext; + + @Spy + @InjectMocks + DatasyncHook datasyncHook; + @Mock DataSyncClient client; - MockedStatic datasyncHookMockedStatic; + @BeforeEach public void before() throws IllegalAccessException { - client = mock(DataSyncClient.class); - datasyncHookMockedStatic = mockStatic(DatasyncHook.class); - when(DatasyncHook.createClient()).thenReturn(client); - - DatasyncParameters DatasyncParameters = new DatasyncParameters(); - datasyncTask = initTask(DatasyncParameters); - datasyncTask.setHook(datasyncHook); - } - - @Test - public void testCreateTaskJson() { String jsonData = "{\n" + " \"CloudWatchLogGroupArn\": \"arn:aws:logs:ap-northeast-3:523202806641:log-group:/aws/datasync:*\",\n" + @@ -123,12 +121,16 @@ public void testCreateTaskJson() { " }\n" + " ]\n" + "}"; - DatasyncParameters DatasyncParameters = new DatasyncParameters(); - DatasyncParameters.setJsonFormat(true); - DatasyncParameters.setJson(jsonData); + DatasyncParameters parameters = new DatasyncParameters(); + parameters.setJson(jsonData); + parameters.setJsonFormat(true); + datasyncTask = initTask(JSONUtils.toJsonString(parameters)); + } + + @Test + public void testCreateTaskJson() { + DatasyncParameters datasyncParameters = datasyncTask.getParameters(); - DatasyncTask DatasyncTask = initTask(DatasyncParameters); - DatasyncParameters datasyncParameters = DatasyncTask.getParameters(); Assertions.assertEquals("arn:aws:logs:ap-northeast-3:523202806641:log-group:/aws/datasync:*", datasyncParameters.getCloudWatchLogGroupArn()); Assertions.assertEquals("task001", datasyncParameters.getName()); @@ -145,86 +147,102 @@ public void testCreateTaskJson() { Assertions.assertEquals("* * * * * ?", datasyncParameters.getSchedule().getScheduleExpression()); Assertions.assertEquals("aTime", datasyncParameters.getOptions().getAtime()); Assertions.assertEquals(Long.valueOf(10), datasyncParameters.getOptions().getBytesPerSecond()); - datasyncHookMockedStatic.close(); } @Test public void testCheckCreateTask() { - DatasyncHook hook = spy(new DatasyncHook()); CreateTaskResponse response = mock(CreateTaskResponse.class); - when(client.createTask((CreateTaskRequest) any())).thenReturn(response); SdkHttpResponse sdkMock = mock(SdkHttpResponse.class); + DescribeTaskResponse describeTaskResponse = mock(DescribeTaskResponse.class); + when(client.createTask((CreateTaskRequest) any())).thenReturn(response); when(response.sdkHttpResponse()).thenReturn(sdkMock); + when(describeTaskResponse.sdkHttpResponse()).thenReturn(sdkMock); when(sdkMock.isSuccessful()).thenReturn(true); when(response.taskArn()).thenReturn(mockTaskArn); + when(client.describeTask((DescribeTaskRequest) any())).thenReturn(describeTaskResponse); + when(describeTaskResponse.status()).thenReturn(TaskStatus.AVAILABLE); - doReturn(true).when(hook).doubleCheckTaskStatus(any(), any()); - hook.createDatasyncTask(datasyncTask.getParameters()); - Assertions.assertEquals(mockTaskArn, hook.getTaskArn()); - datasyncHookMockedStatic.close(); + Boolean flag = datasyncHook.createDatasyncTask(datasyncTask.getParameters()); + + Assertions.assertEquals(mockTaskArn, datasyncHook.getTaskArn()); + Assertions.assertTrue(flag); } @Test public void testStartTask() { - DatasyncHook hook = spy(new DatasyncHook()); StartTaskExecutionResponse response = mock(StartTaskExecutionResponse.class); - when(client.startTaskExecution((StartTaskExecutionRequest) any())).thenReturn(response); SdkHttpResponse sdkMock = mock(SdkHttpResponse.class); + DescribeTaskExecutionResponse describeTaskExecutionResponse = mock(DescribeTaskExecutionResponse.class); + + when(client.startTaskExecution((StartTaskExecutionRequest) any())).thenReturn(response); when(response.sdkHttpResponse()).thenReturn(sdkMock); when(sdkMock.isSuccessful()).thenReturn(true); when(response.taskExecutionArn()).thenReturn(mockExeArn); - doReturn(true).when(hook).doubleCheckExecStatus(any(), any()); - hook.startDatasyncTask(); - Assertions.assertEquals(mockExeArn, hook.getTaskExecArn()); - datasyncHookMockedStatic.close(); + when(describeTaskExecutionResponse.sdkHttpResponse()).thenReturn(sdkMock); + when(client.describeTaskExecution((DescribeTaskExecutionRequest) any())) + .thenReturn(describeTaskExecutionResponse); + when(describeTaskExecutionResponse.status()).thenReturn(TaskExecutionStatus.LAUNCHING); + Boolean executionFlag = datasyncHook.startDatasyncTask(); + + Assertions.assertEquals(mockExeArn, datasyncHook.getTaskExecArn()); + Assertions.assertTrue(executionFlag); } @Test public void testCancelTask() { - DatasyncHook hook = spy(new DatasyncHook()); CancelTaskExecutionResponse response = mock(CancelTaskExecutionResponse.class); - when(client.cancelTaskExecution((CancelTaskExecutionRequest) any())).thenReturn(response); SdkHttpResponse sdkMock = mock(SdkHttpResponse.class); + when(client.cancelTaskExecution((CancelTaskExecutionRequest) any())).thenReturn(response); when(response.sdkHttpResponse()).thenReturn(sdkMock); when(sdkMock.isSuccessful()).thenReturn(true); - Assertions.assertEquals(true, hook.cancelDatasyncTask()); - datasyncHookMockedStatic.close(); + Assertions.assertEquals(true, datasyncHook.cancelDatasyncTask()); } @Test public void testDescribeTask() { - DatasyncHook hook = spy(new DatasyncHook()); - doReturn(null).when(hook).queryDatasyncTaskStatus(); - Assertions.assertEquals(false, hook.doubleCheckTaskStatus(TaskStatus.AVAILABLE, DatasyncHook.taskFinishFlags)); + SdkHttpResponse sdkMock = mock(SdkHttpResponse.class); + DescribeTaskResponse failed = mock(DescribeTaskResponse.class); + DescribeTaskResponse available = mock(DescribeTaskResponse.class); - doReturn(TaskStatus.AVAILABLE).when(hook).queryDatasyncTaskStatus(); - Assertions.assertEquals(true, hook.doubleCheckTaskStatus(TaskStatus.AVAILABLE, DatasyncHook.taskFinishFlags)); - datasyncHookMockedStatic.close(); + when(client.describeTask((DescribeTaskRequest) any())).thenReturn(failed); + when(failed.sdkHttpResponse()).thenReturn(sdkMock); + when(sdkMock.isSuccessful()).thenReturn(true); + when(failed.status()).thenReturn(TaskStatus.UNKNOWN_TO_SDK_VERSION); + Assertions.assertEquals(false, + datasyncHook.doubleCheckTaskStatus(TaskStatus.AVAILABLE, DatasyncHook.taskFinishFlags)); + + when(client.describeTask((DescribeTaskRequest) any())).thenReturn(available); + when(available.sdkHttpResponse()).thenReturn(sdkMock); + when(sdkMock.isSuccessful()).thenReturn(true); + when(available.status()).thenReturn(TaskStatus.AVAILABLE); + Assertions.assertEquals(true, + datasyncHook.doubleCheckTaskStatus(TaskStatus.AVAILABLE, DatasyncHook.taskFinishFlags)); } @Test public void testDescribeTaskExec() { - DatasyncHook hook = spy(new DatasyncHook()); - doReturn(null).when(hook).queryDatasyncTaskExecStatus(); + SdkHttpResponse sdkMock = mock(SdkHttpResponse.class); + DescribeTaskExecutionResponse failed = mock(DescribeTaskExecutionResponse.class); + DescribeTaskExecutionResponse success = mock(DescribeTaskExecutionResponse.class); + + when(client.describeTaskExecution((DescribeTaskExecutionRequest) any())).thenReturn(failed); + when(failed.sdkHttpResponse()).thenReturn(sdkMock); + when(sdkMock.isSuccessful()).thenReturn(true); + when(failed.status()).thenReturn(TaskExecutionStatus.UNKNOWN_TO_SDK_VERSION); Assertions.assertEquals(false, - hook.doubleCheckExecStatus(TaskExecutionStatus.SUCCESS, DatasyncHook.doneStatus)); + datasyncHook.doubleCheckExecStatus(TaskExecutionStatus.SUCCESS, DatasyncHook.doneStatus)); - doReturn(TaskExecutionStatus.SUCCESS).when(hook).queryDatasyncTaskExecStatus(); - Assertions.assertEquals(true, hook.doubleCheckExecStatus(TaskExecutionStatus.SUCCESS, DatasyncHook.doneStatus)); - datasyncHookMockedStatic.close(); + when(client.describeTaskExecution((DescribeTaskExecutionRequest) any())).thenReturn(success); + when(success.sdkHttpResponse()).thenReturn(sdkMock); + when(sdkMock.isSuccessful()).thenReturn(true); + when(success.status()).thenReturn(TaskExecutionStatus.SUCCESS); + Assertions.assertEquals(true, + datasyncHook.doubleCheckExecStatus(TaskExecutionStatus.SUCCESS, DatasyncHook.doneStatus)); } - private DatasyncTask initTask(DatasyncParameters DatasyncParameters) { - TaskExecutionContext taskExecutionContext = createContext(DatasyncParameters); - DatasyncTask datasyncTask = new DatasyncTask(taskExecutionContext); + private DatasyncTask initTask(String contextJson) { + doReturn(contextJson).when(taskExecutionContext).getTaskParams(); datasyncTask.init(); return datasyncTask; } - - public TaskExecutionContext createContext(DatasyncParameters DatasyncParameters) { - String parameters = JSONUtils.toJsonString(DatasyncParameters); - TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); - Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); - return taskExecutionContext; - } } diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 69aee6a0e2e3..b3238fc266b5 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -238,6 +238,13 @@ export default { confirm_to_offline: 'Confirm to make the workflow offline?', time_to_online: 'Confirm to make the Scheduler online?', time_to_offline: 'Confirm to make the Scheduler offline?', + warning_dependent_tasks_title: 'Warning', + warning_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the workflow offline?', + warning_dependencies: 'Dependencies:', + delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the workflow.', + warning_offline_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to make the scheduler offline?', + delete_task_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.', + warning_delete_scheduler_dependent_tasks_desc: 'The downstream dependent tasks exists. Are you sure to delete the scheduler?', }, task: { on_line: 'Online', @@ -306,7 +313,8 @@ export default { startup_parameter: 'Startup Parameter', whether_dry_run: 'Whether Dry-Run', please_choose: 'Please Choose', - remove_task_cache: 'Clear cache' + remove_task_cache: 'Clear cache', + delete_validate_dependent_tasks_desc: 'The downstream dependent tasks exists. You can not delete the task.', }, dag: { create: 'Create Workflow', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 8b84cc7f135f..21d35e2a71b1 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -236,6 +236,13 @@ export default { confirm_to_offline: '是否确定下线该工作流?', time_to_online: '是否确定上线该定时?', time_to_offline: '是否确定下线该定时?', + warning_dependent_tasks_title: '警告', + warning_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该工作流嘛?', + warning_dependencies: '依赖如下:', + delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该工作流', + warning_offline_scheduler_dependent_tasks_desc: '下游存在依赖, 下线操作可能会对下游任务产生影响. 你确定要下线该定时嘛?', + delete_task_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务.', + warning_delete_scheduler_dependent_tasks_desc: '下游存在依赖, 删除定时可能会对下游任务产生影响. 你确定要删除该定时嘛?', }, task: { on_line: '线上', @@ -304,7 +311,8 @@ export default { startup_parameter: '启动参数', whether_dry_run: '是否空跑', please_choose: '请选择', - remove_task_cache: '清除缓存' + remove_task_cache: '清除缓存', + delete_validate_dependent_tasks_desc: '下游存在依赖,你不能删除该任务定义', }, dag: { create: '创建工作流', diff --git a/dolphinscheduler-ui/src/service/modules/lineages/index.ts b/dolphinscheduler-ui/src/service/modules/lineages/index.ts index 2eca2d18b0c5..d43e69276ce5 100644 --- a/dolphinscheduler-ui/src/service/modules/lineages/index.ts +++ b/dolphinscheduler-ui/src/service/modules/lineages/index.ts @@ -16,7 +16,7 @@ */ import { axios } from '@/service/service' -import { ProjectCodeReq, WorkflowCodeReq } from './types' +import {DependentTaskReq, ProjectCodeReq, WorkflowCodeReq} from './types' export function queryWorkFlowList(projectCode: ProjectCodeReq): any { return axios({ @@ -41,3 +41,11 @@ export function queryLineageByWorkFlowCode( method: 'get' }) } + +export function queryDependentTasks(projectCode: number, params: DependentTaskReq): any { + return axios({ + url: `/projects/${projectCode}/lineages/query-dependent-tasks`, + method: 'get', + params + }) +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/service/modules/lineages/types.ts b/dolphinscheduler-ui/src/service/modules/lineages/types.ts index 63294b7b7e74..843449105375 100644 --- a/dolphinscheduler-ui/src/service/modules/lineages/types.ts +++ b/dolphinscheduler-ui/src/service/modules/lineages/types.ts @@ -47,10 +47,15 @@ interface WorkflowRes { workFlowRelationList: WorkFlowRelationList[] } +interface DependentTaskReq extends WorkflowCodeReq { + taskCode?: number +} + export { ProjectCodeReq, WorkflowCodeReq, WorkFlowNameReq, + DependentTaskReq, WorkflowRes, WorkFlowListRes } diff --git a/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx b/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx new file mode 100644 index 000000000000..fbb09b307134 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/components/dependencies/dependencies-modal.tsx @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + defineComponent, + PropType, + h, + ref, watch +} from 'vue' +import { useI18n } from 'vue-i18n' +import {NEllipsis, NModal, NSpace} from 'naive-ui' +import {IDefinitionData} from "@/views/projects/workflow/definition/types"; +import ButtonLink from "@/components/button-link"; + +const props = { + row: { + type: Object as PropType, + default: {}, + required: false + }, + show: { + type: Boolean as PropType, + default: false + }, + required: { + type: Boolean as PropType, + default: true + }, + taskLinks: { + type: Array, + default: [] + }, + content: { + type: String, + default: '' + } +} + +export default defineComponent({ + name: 'dependenciesConfirm', + props, + emits: ['update:show', 'update:row', 'confirm'], + setup(props, ctx) { + const { t } = useI18n() + + const showRef = ref(props.show) + + const confirmToHandle = () => { + ctx.emit('confirm') + } + + const cancelToHandle = () => { + ctx.emit('update:show', showRef) + } + + const renderDownstreamDependencies = () => { + return h( + +
{props.content}
+
{t('project.workflow.warning_dependencies')}
+ {props.taskLinks.map((item: any) => { + return ( + + {{ + default: () => + h(NEllipsis, + { + style: 'max-width: 350px;line-height: 1.5' + }, + () => item.text + ) + }} + + ) + })} +
+ ) + } + + watch(()=> props.show, + () => { + showRef.value = props.show + }) + + return {renderDownstreamDependencies, confirmToHandle, cancelToHandle, showRef} + }, + + render() { + const { t } = useI18n() + + return ( + + {{ + default: () => ( + this.renderDownstreamDependencies() + ) + }} + + ) + } +}) diff --git a/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts b/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts new file mode 100644 index 000000000000..0e35cfbb8945 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/components/dependencies/use-dependencies.ts @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {DependentTaskReq} from "@/service/modules/lineages/types"; +import {queryDependentTasks} from "@/service/modules/lineages"; +import {TASK_TYPES_MAP} from "@/store/project"; + +export function useDependencies() { + + const getDependentTasksBySingleTask = async (projectCode: any, workflowCode: any, taskCode: any) => { + let tasks = [] as any + if (workflowCode && taskCode) { + let dependentTaskReq = {workFlowCode: workflowCode, taskCode: taskCode} as DependentTaskReq + const res = await queryDependentTasks(projectCode, dependentTaskReq) + res.filter((item: any) => item.processDefinitionCode !== workflowCode && item.taskType === TASK_TYPES_MAP.DEPENDENT.alias) + .forEach((item: any) => { + tasks.push(item.processDefinitionName + '->' + item.taskName) + }) + } + return tasks + } + + const getDependentTasksByWorkflow = async (projectCode: any, workflowCode: any) => { + let tasks = [] as any + if (workflowCode) { + let dependentTaskReq = {workFlowCode: workflowCode} as DependentTaskReq + const res = await queryDependentTasks(projectCode, dependentTaskReq) + res.filter((item: any) => item.processDefinitionCode !== workflowCode && item.taskType === TASK_TYPES_MAP.DEPENDENT.alias) + .forEach((item: any) => { + tasks.push(item.processDefinitionName + '->' + item.taskName) + }) + } + return tasks + } + + const getDependentTasksByMultipleTasks = async (projectCode: any, workflowCode: any, taskCodes: any[]) => { + let tasks = [] as any + if (workflowCode && taskCodes?.length>0) { + for(const taskCode of taskCodes) { + const res = await getDependentTasksBySingleTask(projectCode, workflowCode, taskCode) + if (res?.length >0) { + tasks = tasks.concat(res) + } + } + } + return tasks + } + + const getDependentTaskLinksByMultipleTasks = async (projectCode: any, workflowCode: any, taskCodes: any[]) => { + let dependentTaskLinks = [] as any + if (workflowCode && projectCode) { + for (const taskCode of taskCodes) { + await getDependentTaskLinksByTask(projectCode, workflowCode, taskCode).then((res: any) => { + dependentTaskLinks = dependentTaskLinks.concat(res) + }) + } + } + return dependentTaskLinks + } + + const getDependentTaskLinks = async (projectCode: any, workflowCode: any) => { + let dependentTaskReq = {workFlowCode: workflowCode} as DependentTaskReq + let dependentTaskLinks = [] as any + if (workflowCode && projectCode) { + await queryDependentTasks(projectCode, dependentTaskReq).then((res: any) => { + res.filter((item: any) => item.processDefinitionCode !== workflowCode && item.taskType === TASK_TYPES_MAP.DEPENDENT.alias) + .forEach((item: any) => { + dependentTaskLinks.push( + { + text: item.processDefinitionName + '->' + item.taskName, + show: true, + action: () => { + const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}` + window.open(url, '_blank') + }, + } + ) + }) + }) + } + return dependentTaskLinks + } + + const getDependentTaskLinksByTask = async (projectCode: any, workflowCode: any, taskCode: any) => { + let dependentTaskReq = {workFlowCode: workflowCode, taskCode: taskCode} as DependentTaskReq + let dependentTaskLinks = [] as any + if (workflowCode && projectCode) { + await queryDependentTasks(projectCode, dependentTaskReq).then((res: any) => { + res.filter((item: any) => item.processDefinitionCode !== workflowCode && item.taskType === TASK_TYPES_MAP.DEPENDENT.alias) + .forEach((item: any) => { + dependentTaskLinks.push( + { + text: item.processDefinitionName + '->' + item.taskName, + show: true, + action: () => { + const url = `/projects/${item.projectCode}/workflow/definitions/${item.processDefinitionCode}` + window.open(url, '_blank') + }, + } + ) + }) + }) + } + return dependentTaskLinks + } + + return { getDependentTasksBySingleTask, getDependentTasksByMultipleTasks, getDependentTaskLinks, getDependentTasksByWorkflow, getDependentTaskLinksByTask, getDependentTaskLinksByMultipleTasks } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx b/dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx index 500d63ddb665..fc51cb84d01f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx +++ b/dolphinscheduler-ui/src/views/projects/task/definition/batch-task.tsx @@ -41,6 +41,7 @@ import Card from '@/components/card' import VersionModal from './components/version-modal' import TaskModal from '@/views/projects/task/components/node/detail-modal' import type { INodeData } from './types' +import DependenciesModal from "@/views/projects/components/dependencies/dependencies-modal"; const BatchTaskDefinition = defineComponent({ name: 'batch-task-definition', @@ -213,6 +214,13 @@ const BatchTaskDefinition = defineComponent({ readonly={this.taskReadonly} saving={this.taskSaving} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts b/dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts index 78e59b9e70bb..341fa6108111 100644 --- a/dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/task/definition/use-table.ts @@ -49,11 +49,15 @@ import type { } from '@/service/modules/task-definition/types' import type { IRecord } from './types' +import { useDependencies } from '../../components/dependencies/use-dependencies' + export function useTable(onEdit: Function) { const { t } = useI18n() const route = useRoute() const projectCode = Number(route.params.projectCode) + const {getDependentTaskLinksByTask} = useDependencies() + const createColumns = (variables: any) => { variables.columns = [ { @@ -260,22 +264,38 @@ export function useTable(onEdit: Function) { totalPage: ref(1), taskType: ref(null), showVersionModalRef: ref(false), + dependentTasksShowRef: ref(false), + dependentTaskLinksRef: ref([]), row: {}, - loadingRef: ref(false) + loadingRef: ref(false), + dependenciesData: ref({showRef: ref(false), taskLinks: ref([]), required: ref(false), tip: ref(''), action:() => {}}), }) const handleDelete = (row: any) => { - deleteTaskDefinition({ code: row.taskCode }, { projectCode }).then(() => { - getTableData({ - pageSize: variables.pageSize, - pageNo: - variables.tableData.length === 1 && variables.page > 1 - ? variables.page - 1 - : variables.page, - searchTaskName: variables.searchTaskName, - searchWorkflowName: variables.searchWorkflowName, - taskType: variables.taskType - }) + variables.row = row + getDependentTaskLinksByTask(projectCode, row.processDefinitionCode, row.taskCode).then((res: any) =>{ + if (res && res.length > 0) { + variables.dependenciesData = { + showRef: true, + taskLinks: res, + tip: t('project.workflow.delete_validate_dependent_tasks_desc'), + required: true, + action: () => {} + } + } else { + deleteTaskDefinition({ code: row.taskCode }, { projectCode }).then(() => { + getTableData({ + pageSize: variables.pageSize, + pageNo: + variables.tableData.length === 1 && variables.page > 1 + ? variables.page - 1 + : variables.page, + searchTaskName: variables.searchTaskName, + searchWorkflowName: variables.searchWorkflowName, + taskType: variables.taskType + }) + }) + } }) } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx index 3fd3ca0b5a8a..517a67d0afbf 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx @@ -23,7 +23,8 @@ import { useRoute } from 'vue-router' import styles from './menu.module.scss' import { uuid } from '@/common/common' import { IWorkflowTaskInstance } from './types' -import { NButton } from 'naive-ui' +import {NButton} from 'naive-ui' +import {useDependencies} from "@/views/projects/components/dependencies/use-dependencies" const props = { startDisplay: { @@ -57,6 +58,10 @@ const props = { top: { type: Number as PropType, default: 0 + }, + dependenciesData: { + type: Object as PropType, + require: false } } @@ -77,6 +82,12 @@ export default defineComponent({ const graph = inject('graph', ref()) const route = useRoute() const projectCode = Number(route.params.projectCode) + const workflowCode = Number(route.params.code) + const { t } = useI18n() + + const { getDependentTaskLinksByTask } = useDependencies() + + const dependenciesData = props.dependenciesData const hide = () => { ctx.emit('hide', false) @@ -134,9 +145,19 @@ export default defineComponent({ }) } - const handleDelete = () => { - graph.value?.removeCell(props.cell) - ctx.emit('removeTasks', [Number(props.cell?.id)]) + const handleDelete = async () => { + let taskCode = props.cell?.id + let res = await getDependentTaskLinksByTask(projectCode, workflowCode, taskCode) + dependenciesData.showRef = false + if (res.length > 0) { + dependenciesData.showRef = true + dependenciesData.taskLinks = res + dependenciesData.tip = t('project.task.delete_validate_dependent_tasks_desc') + dependenciesData.required = true + } else { + graph.value?.removeCell(props.cell) + ctx.emit('removeTasks', [Number(props.cell?.id)]) + } } onMounted(() => { @@ -189,8 +210,8 @@ export default defineComponent({ {t('project.node.copy')} {t('project.node.delete')} diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-toolbar.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-toolbar.tsx index d7d517b86737..5dc191db9b73 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-toolbar.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-toolbar.tsx @@ -49,6 +49,7 @@ import type { Graph } from '@antv/x6' import StartupParam from './dag-startup-param' import VariablesView from '@/views/projects/workflow/instance/components/variables-view' import { WorkflowDefinition, WorkflowInstance } from './types' +import { useDependencies } from "@/views/projects/components/dependencies/use-dependencies" const props = { layoutToggle: { @@ -64,6 +65,10 @@ const props = { // The same as the structure responsed by the queryProcessDefinitionByCode api type: Object as PropType, default: null + }, + dependenciesData: { + type: Object as PropType, + require: false } } @@ -79,6 +84,11 @@ export default defineComponent({ const graph = inject>('graph', ref()) const router = useRouter() const route = useRoute() + const projectCode = Number(route.params.projectCode) + const workflowCode = Number(route.params.code) + const { getDependentTaskLinksByMultipleTasks } = useDependencies() + + const dependenciesData = props.dependenciesData /** * Node search and navigate @@ -164,15 +174,23 @@ export default defineComponent({ /** * Delete selected edges and nodes */ - const removeCells = () => { + const removeCells = async () => { if (graph.value) { const cells = graph.value.getSelectedCells() if (cells) { const codes = cells .filter((cell) => cell.isNode()) .map((cell) => +cell.id) - context.emit('removeTasks', codes, cells) - graph.value?.removeCells(cells) + const res = await getDependentTaskLinksByMultipleTasks(projectCode, workflowCode, codes) + if (res.length > 0) { + dependenciesData.showRef = true + dependenciesData.taskLinks = res + dependenciesData.tip = t('project.task.delete_validate_dependent_tasks_desc') + dependenciesData.required = true + } else { + context.emit('removeTasks', codes, cells) + graph.value?.removeCells(cells) + } } } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx index c41485db2176..e04afecd09d8 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx @@ -24,7 +24,7 @@ import { toRef, watch, onBeforeUnmount, - computed + computed, reactive } from 'vue' import { useI18n } from 'vue-i18n' import { useRoute } from 'vue-router' @@ -57,6 +57,7 @@ import utils from '@/utils' import { useUISettingStore } from '@/store/ui-setting/ui-setting' import { executeTask } from '@/service/modules/executors' import { removeTaskInstanceCache } from '@/service/modules/task-instances' +import DependenciesModal from "@/views/projects/components/dependencies/dependencies-modal"; const props = { // If this prop is passed, it means from definition detail @@ -333,6 +334,13 @@ export default defineComponent({ } } + const dependenciesData = reactive({ + showRef: ref(false), + taskLinks: ref([]), + required: ref(false), + tip: ref(''), action: () => {} + }) + watch( () => props.definition, () => { @@ -373,6 +381,7 @@ export default defineComponent({ onSaveModelToggle={saveModelToggle} onRemoveTasks={removeTasks} onRefresh={refreshTaskStatus} + v-model:dependenciesData={dependenciesData} />
@@ -428,6 +437,14 @@ export default defineComponent({ onViewLog={handleViewLog} onExecuteTask={handleExecuteTask} onRemoveTaskInstanceCache={handleRemoveTaskInstanceCache} + v-model:dependenciesData={dependenciesData} + /> + {!!props.definition && ( @@ -95,6 +96,7 @@ export default defineComponent({ const handleReleaseScheduler = () => { ctx.emit('releaseScheduler') } + return { handleEditWorkflow, handleStartWorkflow, @@ -114,6 +116,7 @@ export default defineComponent({ const releaseState = this.row?.releaseState const scheduleReleaseState = this.row?.scheduleReleaseState const schedule = this.row?.schedule + return ( @@ -166,10 +169,7 @@ export default defineComponent({ trigger: () => ( {{ - default: () => - releaseState === 'ONLINE' - ? t('project.workflow.confirm_to_offline') - : t('project.workflow.confirm_to_online'), + default: () => releaseState === 'OFFLINE' ? t('project.workflow.confirm_to_online'):t('project.workflow.confirm_to_offline'), trigger: () => ( {{ default: () => - scheduleReleaseState === 'ONLINE' - ? t('project.workflow.time_to_offline') - : t('project.workflow.time_to_online'), + scheduleReleaseState === 'OFFLINE' ? t('project.workflow.time_to_online'):t('project.workflow.time_to_offline'), trigger: () => ( + ) } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/index.tsx index bd348fb8d7b8..9450138e0f4f 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/index.tsx @@ -24,6 +24,7 @@ import { useTable } from './use-table' import Card from '@/components/card' import TimingModal from '../components/timing-modal' import type { Router } from 'vue-router' +import DependenciesModal from "@/views/projects/components/dependencies/dependencies-modal"; export default defineComponent({ name: 'WorkflowDefinitionTiming', @@ -115,6 +116,13 @@ export default defineComponent({ v-model:show={this.showRef} onUpdateList={this.handleUpdateList} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/use-table.ts b/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/use-table.ts index a339e20e46dc..4c86637d04ca 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/timing/use-table.ts @@ -39,15 +39,18 @@ import { import { format } from 'date-fns-tz' import { ISearchParam } from './types' import type { Router } from 'vue-router' +import { useDependencies } from "@/views/projects/components/dependencies/use-dependencies" export function useTable() { const { t } = useI18n() const router: Router = useRouter() + const {getDependentTaskLinks} = useDependencies() + const variables = reactive({ columns: [], tableWidth: DefaultTableWidth, - row: {}, + row: {} as any, tableData: [], projectCode: ref(Number(router.currentRoute.value.params.projectCode)), page: ref(1), @@ -58,7 +61,8 @@ export function useTable() { loadingRef: ref(false), processDefinitionCode: router.currentRoute.value.params.definitionCode ? ref(Number(router.currentRoute.value.params.definitionCode)) - : ref() + : ref(), + dependenciesData: ref({showRef: false, taskLinks: ref([]), required: ref(false), tip: ref(''), action:() => {}}), }) const renderTime = (time: string, timeZone: string) => { @@ -329,7 +333,7 @@ export function useTable() { NPopconfirm, { onPositiveClick: () => { - handleDelete(row.id) + handleDelete(row) } }, { @@ -344,7 +348,8 @@ export function useTable() { { circle: true, type: 'error', - size: 'small' + size: 'small', + disabled: row.releaseState === 'ONLINE' }, { icon: () => h(DeleteOutlined) @@ -387,12 +392,43 @@ export function useTable() { } const handleReleaseState = (row: any) => { - let handle = online if (row.releaseState === 'ONLINE') { - handle = offline + variables.row = row + getDependentTaskLinks(variables.projectCode, row.processDefinitionCode).then((res: any) =>{ + if (res && res.length > 0) { + variables.dependenciesData.showRef = true + variables.dependenciesData.taskLinks = res + variables.dependenciesData.tip = t('project.workflow.warning_delete_scheduler_dependent_tasks_desc') + variables.dependenciesData.required = false + variables.dependenciesData.action = confirmToOfflineSchedule + } else { + offline(variables.projectCode, row.id).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal, + projectCode: variables.projectCode, + processDefinitionCode: variables.processDefinitionCode + }) + }) + }}) + } else { + online(variables.projectCode, row.id).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal, + projectCode: variables.projectCode, + processDefinitionCode: variables.processDefinitionCode + }) + }) } + } - handle(variables.projectCode, row.id).then(() => { + const confirmToOfflineSchedule = () => { + offline(variables.projectCode, variables.row.id).then(() => { window.$message.success(t('project.workflow.success')) getTableData({ pageSize: variables.pageSize, @@ -402,14 +438,11 @@ export function useTable() { processDefinitionCode: variables.processDefinitionCode }) }) + variables.dependenciesData.showRef = false } - const handleDelete = (id: number) => { - /* after deleting data from the current page, you need to jump forward when the page is empty. */ - if (variables.tableData.length === 1 && variables.page > 1) { - variables.page -= 1 - } - deleteScheduleById(id, variables.projectCode).then(() => { + const confirmToDeleteSchedule = () => { + deleteScheduleById(variables.row.id, variables.projectCode).then(() => { window.$message.success(t('project.workflow.success')) getTableData({ pageSize: variables.pageSize, @@ -419,6 +452,35 @@ export function useTable() { processDefinitionCode: variables.processDefinitionCode }) }) + variables.dependenciesData.showRef = false + } + + const handleDelete = (row: any) => { + /* after deleting data from the current page, you need to jump forward when the page is empty. */ + if (variables.tableData.length === 1 && variables.page > 1) { + variables.page -= 1 + } + variables.row = row + getDependentTaskLinks(variables.projectCode, row.processDefinitionCode).then((res: any) =>{ + if (res && res.length > 0) { + variables.dependenciesData.showRef = true + variables.dependenciesData.taskLinks = res + variables.dependenciesData.tip = t('project.workflow.warning_delete_scheduler_dependent_tasks_desc') + variables.dependenciesData.required = false + variables.dependenciesData.action = confirmToDeleteSchedule + } else { + deleteScheduleById(row.id, variables.projectCode).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal, + projectCode: variables.projectCode, + processDefinitionCode: variables.processDefinitionCode + }) + }) + } + }) } return { diff --git a/dolphinscheduler-ui/src/views/projects/workflow/definition/use-table.ts b/dolphinscheduler-ui/src/views/projects/workflow/definition/use-table.ts index 4fe520aa1cd3..04228d532f50 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/definition/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/workflow/definition/use-table.ts @@ -43,11 +43,14 @@ import { import type { IDefinitionParam } from './types' import type { Router } from 'vue-router' import type { TableColumns, RowKey } from 'naive-ui/es/data-table/src/interface' +import {useDependencies} from '../../components/dependencies/use-dependencies' export function useTable() { const { t } = useI18n() const router: Router = useRouter() const { copy } = useTextCopy() + const { getDependentTaskLinks } = useDependencies() + const variables = reactive({ columns: [], tableWidth: DefaultTableWidth, @@ -67,7 +70,8 @@ export function useTable() { versionShowRef: ref(false), copyShowRef: ref(false), loadingRef: ref(false), - setTimingDialogShowRef: ref(false) + setTimingDialogShowRef: ref(false), + dependenciesData: ref({showRef: false, taskLinks: ref([]), required: ref(false), tip: ref(''), action:() => {}}), }) const createColumns = (variables: any) => { @@ -304,17 +308,6 @@ export function useTable() { variables.row = row } - const deleteWorkflow = (row: any) => { - deleteByCode(variables.projectCode, row.code).then(() => { - window.$message.success(t('project.workflow.success')) - getTableData({ - pageSize: variables.pageSize, - pageNo: variables.page, - searchVal: variables.searchVal - }) - }) - } - const batchDeleteWorkflow = () => { const data = { codes: _.join(variables.checkedRowKeys, ',') @@ -354,48 +347,142 @@ export function useTable() { const batchCopyWorkflow = () => {} - const releaseWorkflow = (row: any) => { + const confirmToOfflineWorkflow = () => { + const row: any = variables.row const data = { name: row.name, releaseState: (row.releaseState === 'ONLINE' ? 'OFFLINE' : 'ONLINE') as - | 'OFFLINE' - | 'ONLINE' + | 'OFFLINE' + | 'ONLINE' } + if (data.releaseState === 'OFFLINE') { + release(data, variables.projectCode, row.code).then(() => { + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + window.$message.success(t('project.workflow.success')) + }) + } + variables.dependenciesData.showRef = false + } + + const confirmToOfflineScheduler = () => { + const row: any = variables.row + offline(variables.projectCode, row.schedule.id).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + }) + variables.dependenciesData.showRef = false + } - release(data, variables.projectCode, row.code).then(() => { - if (data.releaseState === 'ONLINE') { + const releaseWorkflow = (row: any) => { + const data = { + name: row.name, + releaseState: (row.releaseState === 'ONLINE' ? 'OFFLINE' : 'ONLINE') as + | 'OFFLINE' + | 'ONLINE' + } + variables.row = row + if (data.releaseState === 'ONLINE') { + release(data, variables.projectCode, row.code).then(() => { variables.setTimingDialogShowRef = true - variables.row = row if (row?.schedule) { variables.row = row.schedule variables.timingType = 'update' variables.timingState = row.scheduleReleaseState } + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + }) + } else { + getDependentTaskLinks(variables.projectCode, row.code).then((res: any) => { + if (res && res.length > 0) { + variables.dependenciesData = { + showRef: true, + taskLinks: res, + tip: t('project.workflow.warning_dependent_tasks_desc'), + required: false, + action: confirmToOfflineWorkflow + } + } else { + release(data, variables.projectCode, row.code).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + }) + } + }) + } + } + + const deleteWorkflow = (row: any) => { + getDependentTaskLinks(variables.projectCode, row.code).then((res: any) => { + if (res && res.length > 0) { + variables.dependenciesData = { + showRef: true, + taskLinks: res, + tip: t('project.workflow.delete_validate_dependent_tasks_desc'), + required: true, + action: () => {} + } } else { - window.$message.success(t('project.workflow.success')) + deleteByCode(variables.projectCode, row.code).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + }) } - getTableData({ - pageSize: variables.pageSize, - pageNo: variables.page, - searchVal: variables.searchVal - }) }) } const releaseScheduler = (row: any) => { + variables.row = row if (row.schedule) { - let handle = online if (row.schedule.releaseState === 'ONLINE') { - handle = offline - } - handle(variables.projectCode, row.schedule.id).then(() => { - window.$message.success(t('project.workflow.success')) - getTableData({ - pageSize: variables.pageSize, - pageNo: variables.page, - searchVal: variables.searchVal + getDependentTaskLinks(variables.projectCode, row.code).then((res: any) => { + if (res && res.length > 0) { + variables.dependenciesData = { + showRef: true, + taskLinks: res, + tip: t('project.workflow.warning_offline_scheduler_dependent_tasks_desc'), + required: false, + action: confirmToOfflineScheduler + } + } else { + offline(variables.projectCode, row.schedule.id).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) + }) + }}) + } else { + online(variables.projectCode, row.schedule.id).then(() => { + window.$message.success(t('project.workflow.success')) + getTableData({ + pageSize: variables.pageSize, + pageNo: variables.page, + searchVal: variables.searchVal + }) }) - }) + } } } @@ -479,6 +566,6 @@ export function useTable() { getTableData, batchDeleteWorkflow, batchExportWorkflow, - batchCopyWorkflow + batchCopyWorkflow, } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/timing/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/timing/index.tsx index 1228cfd8352c..37a33447d5f3 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/timing/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/timing/index.tsx @@ -15,14 +15,15 @@ * limitations under the License. */ -import { NDataTable, NPagination, NSpace } from 'naive-ui' -import { defineComponent, onMounted, toRefs, watch } from 'vue' +import {NDataTable, NPagination, NSpace} from 'naive-ui' +import {defineComponent, onMounted, toRefs, watch} from 'vue' import { useI18n } from 'vue-i18n' import { useTable } from '../definition/timing/use-table' import Card from '@/components/card' import TimingModal from '../definition/components/timing-modal' import TimingCondition from '@/views/projects/workflow/timing/components/timing-condition' import { ITimingSearch } from '@/views/projects/workflow/timing/types' +import DependenciesModal from "@/views/projects/components/dependencies/dependencies-modal"; export default defineComponent({ name: 'WorkflowTimingList', @@ -110,6 +111,13 @@ export default defineComponent({ v-model:show={this.showRef} onUpdateList={this.handleUpdateList} /> + ) } diff --git a/dolphinscheduler-ui/src/views/resource/task-group/option/use-table.ts b/dolphinscheduler-ui/src/views/resource/task-group/option/use-table.ts index fc286d7d0cc2..838a1d39cbcd 100644 --- a/dolphinscheduler-ui/src/views/resource/task-group/option/use-table.ts +++ b/dolphinscheduler-ui/src/views/resource/task-group/option/use-table.ts @@ -147,6 +147,7 @@ export function useTable( parseTime(item.updateTime), 'yyyy-MM-dd HH:mm:ss' ) + item.status = (item.status == 'YES') ? 1 : 0 return { ...item }