Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wuchao committed Oct 22, 2024
1 parent 3075350 commit 6da1983
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public WorkflowExecutionRunnable handleCommand(final Command command) {
.withCommand(command);

assembleWorkflowDefinition(workflowExecuteContextBuilder);
assembleProject(workflowExecuteContextBuilder);
assembleWorkflowGraph(workflowExecuteContextBuilder);
assembleWorkflowInstance(workflowExecuteContextBuilder);
assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder);
Expand Down Expand Up @@ -111,10 +112,6 @@ protected void assembleWorkflowDefinition(
checkArgument(workflowDefinition != null,
"Cannot find the WorkflowDefinition: [" + workflowDefinitionCode + ":" + workflowDefinitionVersion
+ "]");
Project project = projectDao.queryByCode(workflowDefinition.getProjectCode());
if (project != null) {
workflowDefinition.setProjectName(project.getName());
}
workflowExecuteContextBuilder.setWorkflowDefinition(workflowDefinition);

}
Expand Down Expand Up @@ -155,4 +152,12 @@ protected List<TaskInstance> getValidTaskInstance(final WorkflowInstance workflo
workflowInstance.getTestFlag());
}

protected void assembleProject(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
final WorkflowDefinition workflowDefinition = workflowExecuteContextBuilder.getWorkflowDefinition();
final Project project = projectDao.queryByCode(workflowDefinition.getProjectCode());
checkArgument(project != null, "Cannot find the project code: " + workflowDefinition.getProjectCode());
workflowExecuteContextBuilder.setProject(project);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilde
.builder()
.workflowExecutionGraph(workflowExecutionGraph)
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
.project(workflowExecuteContextBuilder.getProject())
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
.taskDefinition(workflowGraph.getTaskNodeByName(task))
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.engine.task.runnable;

import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand All @@ -35,5 +36,6 @@ public class TaskExecutionContextCreateRequest {
private WorkflowInstance workflowInstance;
private TaskDefinition taskDefinition;
private TaskInstance taskInstance;
private Project project;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class TaskExecutionRunnable implements ITaskExecutionRunnable {
@Getter
private final WorkflowDefinition workflowDefinition;
@Getter
private final Project project;
@Getter

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
ITaskExecutionRunnable.getWorkflowInstance
; it is advisable to add an Override annotation.
private final WorkflowInstance workflowInstance;
@Getter
private TaskInstance taskInstance;
Expand All @@ -70,6 +73,7 @@ public TaskExecutionRunnable(TaskExecutionRunnableBuilder taskExecutionRunnableB
this.workflowExecutionGraph = checkNotNull(taskExecutionRunnableBuilder.getWorkflowExecutionGraph());
this.workflowEventBus = checkNotNull(taskExecutionRunnableBuilder.getWorkflowEventBus());
this.workflowDefinition = checkNotNull(taskExecutionRunnableBuilder.getWorkflowDefinition());
this.project = checkNotNull(taskExecutionRunnableBuilder.getProject());
this.workflowInstance = checkNotNull(taskExecutionRunnableBuilder.getWorkflowInstance());
this.taskDefinition = checkNotNull(taskExecutionRunnableBuilder.getTaskDefinition());
this.taskInstance = taskExecutionRunnableBuilder.getTaskInstance();
Expand Down Expand Up @@ -144,6 +148,7 @@ private void initializeTaskExecutionContext() {
checkState(isTaskInstanceInitialized(), "The task instance is null, can't initialize TaskExecutionContext.");
final TaskExecutionContextCreateRequest request = TaskExecutionContextCreateRequest.builder()
.workflowDefinition(workflowDefinition)
.project(project)
.workflowInstance(workflowInstance)
.taskDefinition(taskDefinition)
.taskInstance(taskInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.engine.task.runnable;

import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand All @@ -37,6 +38,7 @@ public class TaskExecutionRunnableBuilder {

private final IWorkflowExecutionGraph workflowExecutionGraph;
private final WorkflowDefinition workflowDefinition;
private final Project project;
private final WorkflowInstance workflowInstance;
private final TaskDefinition taskDefinition;
private final TaskInstance taskInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.dao.entity.DqRule;
import org.apache.dolphinscheduler.dao.entity.DqRuleExecuteSql;
import org.apache.dolphinscheduler.dao.entity.DqRuleInputEntry;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand Down Expand Up @@ -100,6 +101,7 @@ public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreat
TaskInstance taskInstance = request.getTaskInstance();
WorkflowInstance workflowInstance = request.getWorkflowInstance();
WorkflowDefinition workflowDefinition = request.getWorkflowDefinition();
Project project = request.getProject();

ResourceParametersHelper resources = TaskPluginManager.getTaskChannel(taskInstance.getTaskType())
.parseParameters(taskInstance.getTaskParams())
Expand All @@ -110,9 +112,10 @@ public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreat

AbstractParameters baseParam =
TaskPluginManager.parseTaskParameters(taskInstance.getTaskType(), taskInstance.getTaskParams());

Map<String, Property> propertyMap =
curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance,
workflowDefinition);
project.getName(), workflowDefinition.getName());
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildWorkflowInstanceHost(masterConfig.getMasterAddress())
.buildTaskInstanceRelatedInfo(taskInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
Expand All @@ -40,6 +41,8 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext {

private final WorkflowDefinition workflowDefinition;

private final Project project;

private final WorkflowInstance workflowInstance;

private final IWorkflowGraph workflowGraph;
Expand Down Expand Up @@ -72,6 +75,8 @@ public static class WorkflowExecuteContextBuilder {

private List<IWorkflowLifecycleListener> workflowInstanceLifecycleListeners;

private Project project;

public WorkflowExecuteContextBuilder withCommand(Command command) {
this.command = command;
return this;
Expand All @@ -81,6 +86,7 @@ public WorkflowExecuteContext build() {
return new WorkflowExecuteContext(
command,
workflowDefinition,
project,
workflowInstance,
workflowGraph,
workflowExecutionGraph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public static class WorkflowTriggerDTO {

private final WorkflowDefinition workflowDefinition;

private final Project project;

private final RunWorkflowCommandParam runWorkflowCommandParam;

@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,30 @@ public void testStartWorkflowFromStartNodes_with_threeParallelSuccessTask() {

assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one fake task success")
public void testStartWorkflow_with_oneSuccessFakeTask() {
final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(context.getWorkflows().get(0))
.project(context.getProject())
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(workflowTriggerDTO.getProject().getName())
.isEqualTo("MasterIntegrationTest");
Assertions
.assertThat(workflowTriggerDTO.getWorkflowDefinition().getName())
.isEqualTo("workflow_with_one_fake_task_success");
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
Expand Down Expand Up @@ -77,12 +76,15 @@ String curingGlobalParams(Integer workflowInstanceId, Map<String, String> global
* @param parameters
* @param taskInstance
* @param workflowInstance
* @param projectName
* @param workflowDefinitionName
* @return
*/
Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance,
@NonNull AbstractParameters parameters,
@NonNull WorkflowInstance workflowInstance,
@NonNull WorkflowDefinition workflowDefinition);
String projectName,
String workflowDefinitionName);

/**
* Parse workflow star parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.ProjectParameter;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.mapper.ProjectParameterMapper;
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
Expand Down Expand Up @@ -181,13 +180,16 @@ public Map<String, Property> parseWorkflowFatherParam(@Nullable Map<String, Stri
* @param taskInstance
* @param parameters
* @param workflowInstance
* @param projectName
* @param workflowDefinitionName
* @return
*/
@Override
public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance,
@NonNull AbstractParameters parameters,
@NonNull WorkflowInstance workflowInstance,
@NonNull WorkflowDefinition workflowDefinition) {
String projectName,
String workflowDefinitionName) {
Map<String, Property> prepareParamsMap = new HashMap<>();

// assign value to definedParams here
Expand All @@ -208,7 +210,7 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI

// built-in params
Map<String, String> builtInParams =
setBuiltInParamsMap(taskInstance, workflowInstance, timeZone, workflowDefinition);
setBuiltInParamsMap(taskInstance, workflowInstance, timeZone, projectName, workflowDefinitionName);

// project-level params
Map<String, Property> projectParams = getProjectParameterMap(taskInstance.getProjectCode());
Expand Down Expand Up @@ -276,11 +278,14 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
*
* @param taskInstance
* @param timeZone
* @param projectName
* @param workflowDefinitionName
*/
private Map<String, String> setBuiltInParamsMap(@NonNull TaskInstance taskInstance,
WorkflowInstance workflowInstance,
String timeZone,
WorkflowDefinition workflowDefinition) {
String projectName,
String workflowDefinitionName) {
CommandType commandType = workflowInstance.getCmdTypeIfComplement();
Date scheduleTime = workflowInstance.getScheduleTime();

Expand All @@ -293,9 +298,9 @@ private Map<String, String> setBuiltInParamsMap(@NonNull TaskInstance taskInstan
params.put(PARAMETER_TASK_DEFINITION_NAME, taskInstance.getName());
params.put(PARAMETER_TASK_DEFINITION_CODE, Long.toString(taskInstance.getTaskCode()));
params.put(PARAMETER_WORKFLOW_INSTANCE_ID, Integer.toString(taskInstance.getWorkflowInstanceId()));
params.put(PARAMETER_WORKFLOW_DEFINITION_NAME, workflowDefinition.getName());
params.put(PARAMETER_WORKFLOW_DEFINITION_NAME, workflowDefinitionName);
params.put(PARAMETER_WORKFLOW_DEFINITION_CODE, Long.toString(workflowInstance.getWorkflowDefinitionCode()));
params.put(PARAMETER_PROJECT_NAME, workflowDefinition.getProjectName());
params.put(PARAMETER_PROJECT_NAME, projectName);
params.put(PARAMETER_PROJECT_CODE, Long.toString(workflowInstance.getProjectCode()));
return params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -209,6 +210,10 @@ public void testParamParsingPreparation() {
workflowDefinition.setProjectCode(3000001L);
workflowDefinition.setCode(200001L);

Project project = new Project();
project.setName("ProjectName");
project.setCode(3000001L);

workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
taskInstance.setTaskCode(taskDefinition.getCode());
Expand All @@ -222,7 +227,7 @@ public void testParamParsingPreparation() {

Map<String, Property> propertyMap =
dolphinSchedulerCuringGlobalParams.paramParsingPreparation(taskInstance, parameters, workflowInstance,
workflowDefinition);
project.getName(), workflowDefinition.getName());
Assertions.assertNotNull(propertyMap);
Assertions.assertEquals(propertyMap.get(TaskConstants.PARAMETER_TASK_INSTANCE_ID).getValue(),
String.valueOf(taskInstance.getId()));
Expand Down

0 comments on commit 6da1983

Please sign in to comment.