Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16612][Master] For logical tasks on the Master, there should be support for dry run #16616

Merged
merged 17 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.execute;

import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import static org.apache.dolphinscheduler.common.constants.Constants.DRY_RUN_FLAG_YES;

import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand Down Expand Up @@ -103,6 +104,15 @@ public void run() {
TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();
beforeExecute();

if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
logicTaskInstanceExecutionEventSenderManager.successEventSender().sendMessage(taskExecutionContext);
log.info(
"The current execute mode is dry run, will stop the logic task and set the taskInstance status to success");
return;
}
Comment on lines +107 to +115
Copy link
Member

@ruanwenjun ruanwenjun Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move this after beforeExecute I am not sure if the statemachine will throw exception, since the task lifecycle might miss running event, so the task state will be dispatched, and dispatched task cannot be converted to success.

Copy link
Contributor Author

@shouwangyw shouwangyw Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to move this after beforeExecute I am not sure if the statemachine will throw exception, since the task lifecycle might miss running event, so the task state will be dispatched, and dispatched task cannot be converted to success.

I find the implementation in WorkerTaskExecutor,this is before beforeExecute.

Copy link
Contributor Author

@shouwangyw shouwangyw Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Result:
image
This is not the effect we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have change, test is ok

image image

TaskInstanceLogHeader.printExecuteTaskHeader();
executeTask();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.integration;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -61,6 +62,7 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO
.workflowDefinitionVersion(workflowTriggerDTO.workflowDefinition.getVersion())
.startNodes(workflowTriggerDTO.getRunWorkflowCommandParam().getStartNodes())
.startParamList(workflowTriggerDTO.getRunWorkflowCommandParam().getCommandParams())
.dryRun(workflowTriggerDTO.dryRun)
.build();

final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
Expand Down Expand Up @@ -139,6 +141,9 @@ public static class WorkflowTriggerDTO {
private final WorkflowDefinition workflowDefinition;

private final RunWorkflowCommandParam runWorkflowCommandParam;

@Builder.Default
private Flag dryRun = Flag.NO;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,50 @@ public void testStartWorkflow_with_oneSuccessTask() {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});
});

assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) dry run success")
public void testStartWorkflow_with_oneSuccessTaskDryRun() {
final String yaml = "/it/start/workflow_with_one_fake_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getWorkflows().get(0);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});

Expand Down Expand Up @@ -121,7 +159,9 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO);
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.NO.getCode());

final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
Expand All @@ -131,6 +171,7 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
.satisfiesExactly(workflowInstance -> {
assertThat(workflowInstance.getState()).isEqualTo(WorkflowExecutionStatus.SUCCESS);
assertThat(workflowInstance.getIsSubWorkflow()).isEqualTo(Flag.YES);
assertThat(workflowInstance.getDryRun()).isEqualTo(Flag.NO.getCode());
});

Assertions
Expand All @@ -151,6 +192,51 @@ public void testStartWorkflow_with_subWorkflowTask_success() {
assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) dry run, will not execute")
public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the parent workflow using dry run but still generate sub workflow instance? If use dry run then the SubWorkflow task will not generate sub workflow instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the parent workflow using dry run but still generate sub workflow instance? If use dry run then the SubWorkflow task will not generate sub workflow instance.

Child workflow inherits the dry run configuration of the parent workflow.
This is indeed the effect, and the sub-workflow will also dry run.

image image

Copy link
Member

@ruanwenjun ruanwenjun Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

OK, I'll check it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make confusion, since physical task dry run will not go into the task plugin logic, you can find the implementation in WorkerTaskExecutor. It's better to keep this consistent.

You should do this change in MasterTaskExecutor, otherwise the other logic task still doesn't support dry run.

Check whether it is dry run in AsyncMasterTaskDelayQueueLooper, it may also be necessary!

final String yaml = "/it/start/workflow_with_sub_workflow_task_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getWorkflows().get(0);

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.dryRun(Flag.YES)
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {

Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS)
.matches(
workflowInstance -> workflowInstance.getIsSubWorkflow() == Flag.NO)
.matches(
workflowInstance -> workflowInstance.getDryRun() == Flag.YES.getCode());

final List<WorkflowInstance> subWorkflowInstance =
repository.queryWorkflowInstance(context.getWorkflows().get(1));
Assertions
.assertThat(subWorkflowInstance)
.isEmpty();

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.satisfiesExactly(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("sub_logic_task");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
assertThat(taskInstance.getDryRun()).isEqualTo(Flag.YES.getCode());
});
});

assertThat(workflowRepository.getAll()).isEmpty();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {
Expand Down
Loading