Skip to content

Commit

Permalink
[Improvement-16574][Master] Move some task operations into ITaskExecu…
Browse files Browse the repository at this point in the history
…tionRunnable (#16575)
  • Loading branch information
ruanwenjun authored Sep 4, 2024
1 parent 3a24d17 commit 0f4bce1
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO
.startNodes(backfillWorkflowDTO.getStartNodes())
.failureStrategy(backfillWorkflowDTO.getFailureStrategy())
.taskDependType(backfillWorkflowDTO.getTaskDependType())
.execType(backfillWorkflowDTO.getExecType())
.warningType(backfillWorkflowDTO.getWarningType())
.warningGroupId(backfillWorkflowDTO.getWarningGroupId())
.workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.extract.master.transportor.workflow;

import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
Expand Down Expand Up @@ -55,8 +54,6 @@ public class WorkflowBackfillTriggerRequest {
@Builder.Default
private TaskDependType taskDependType = TaskDependType.TASK_POST;

private CommandType execType;

@Builder.Default
private WarningType warningType = WarningType.NONE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public void handle(final ITaskStateAction taskStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable,
final TaskPauseLifecycleEvent taskPauseEvent) {
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
taskExecutionRunnable.initializeTaskInstance();
}
taskStateAction.pauseEventAction(workflowExecutionRunnable, taskExecutionRunnable, taskPauseEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void handle(final IWorkflowExecutionRunnable workflowExecutionRunnable,
// So we need to initialize the task instance here.
// Otherwise, we cannot find the statemachine by task instance state.
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
taskExecutionRunnable.initializeTaskInstance();
taskExecutionRunnable.initializeFirstRunTaskInstance();
}
taskTimeoutMonitor(taskExecutionRunnable);
super.handle(workflowExecutionRunnable, taskStartLifecycleEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,48 @@ public interface ITaskExecutionRunnable
extends
Comparable<ITaskExecutionRunnable> {

String getName();
default String getName() {
return getTaskDefinition().getName();
}

/**
* Whether the task instance is initialized.
* <p> If the ITaskExecutionRunnable is never triggered, it is not initialized.
* <p> If the ITaskExecutionRunnable is created by failover, recovered then it is initialized.
*/
boolean isTaskInstanceInitialized();

void initializeTaskInstance();
/**
* Initialize the task instance with {@link FirstRunTaskInstanceFactory}
*/
void initializeFirstRunTaskInstance();

boolean isTaskInstanceNeedRetry();
/**
* Whether the task instance is running.
*/
boolean isTaskInstanceCanRetry();

void initializeRetryTaskInstance();
/**
* Retry the TaskExecutionRunnable.
* <p> Will create retry task instance and start it.
*/
void retry();

void initializeFailoverTaskInstance();
/**
* Failover the TaskExecutionRunnable.
* <p> The failover logic is judged by the task instance state.
*/
void failover();

/**
* Pause the TaskExecutionRunnable.
*/
void pause();

/**
* Kill the TaskExecutionRunnable.
*/
void kill();

WorkflowEventBus getWorkflowEventBus();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,22 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskKillLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskPauseLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;

import org.apache.commons.lang3.StringUtils;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -62,24 +73,20 @@ public TaskExecutionRunnable(TaskExecutionRunnableBuilder taskExecutionRunnableB
this.workflowInstance = checkNotNull(taskExecutionRunnableBuilder.getWorkflowInstance());
this.taskDefinition = checkNotNull(taskExecutionRunnableBuilder.getTaskDefinition());
this.taskInstance = taskExecutionRunnableBuilder.getTaskInstance();
if (taskInstance != null) {
if (isTaskInstanceInitialized()) {
initializeTaskExecutionContext();
}
}

@Override
public String getName() {
return taskDefinition.getName();
}

@Override
public boolean isTaskInstanceInitialized() {
return taskInstance != null;
}

@Override
public void initializeTaskInstance() {
checkState(taskInstance == null, "The task instance is not null, should not initialize again.");
public void initializeFirstRunTaskInstance() {
checkState(!isTaskInstanceInitialized(),
"The task instance is already initialized, can't initialize first run task.");
this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class)
.firstRunTaskInstanceFactory()
.builder()
Expand All @@ -90,35 +97,50 @@ public void initializeTaskInstance() {
}

@Override
public boolean isTaskInstanceNeedRetry() {
public boolean isTaskInstanceCanRetry() {
return taskInstance.getRetryTimes() < taskInstance.getMaxRetryTimes();
}

@Override
public void initializeRetryTaskInstance() {
checkState(taskInstance != null, "The task instance can't retry, should not initialize retry task instance.");
public void retry() {
checkState(isTaskInstanceInitialized(), "The task instance is not initialized, can't initialize retry task.");
this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class)
.retryTaskInstanceFactory()
.builder()
.withTaskInstance(taskInstance)
.build();
initializeTaskExecutionContext();
getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this));
}

@Override
public void initializeFailoverTaskInstance() {
checkState(taskInstance != null,
"The task instance can't failover, should not initialize failover task instance.");
public void failover() {
checkState(isTaskInstanceInitialized(), "The task instance is not initialized, can't failover.");
if (takeOverTaskFromExecutor()) {
log.info("Failover task success, the task {} has been taken-over from executor", taskInstance.getName());
return;
}
this.taskInstance = applicationContext.getBean(TaskInstanceFactories.class)
.failoverTaskInstanceFactory()
.builder()
.withTaskInstance(taskInstance)
.build();
initializeTaskExecutionContext();
getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this));
}

@Override
public void pause() {
getWorkflowEventBus().publish(TaskPauseLifecycleEvent.of(this));
}

@Override
public void kill() {
getWorkflowEventBus().publish(TaskKillLifecycleEvent.of(this));
}

private void initializeTaskExecutionContext() {
checkState(taskInstance != null, "The task instance is null, can't initialize TaskExecutionContext.");
checkState(isTaskInstanceInitialized(), "The task instance is null, can't initialize TaskExecutionContext.");
final TaskExecutionContextCreateRequest request = TaskExecutionContextCreateRequest.builder()
.workflowDefinition(workflowDefinition)
.workflowInstance(workflowInstance)
Expand All @@ -129,6 +151,32 @@ private void initializeTaskExecutionContext() {
.createTaskExecutionContext(request);
}

private boolean takeOverTaskFromExecutor() {
checkState(isTaskInstanceInitialized(), "The task instance is null, can't take over from executor.");
if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) {
return false;
}
if (StringUtils.isEmpty(taskInstance.getHost())) {
log.debug("Task: {} host is empty, cannot take over the task from executor(This is normal case).",
taskInstance.getName());
return false;
}
try {
final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder()
.taskInstanceId(taskInstance.getId())
.workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress())
.build();
final TakeOverTaskResponse takeOverTaskResponse = Clients
.withService(ITaskInstanceOperator.class)
.withHost(taskInstance.getHost())
.takeOverTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over task: {} failed", taskInstance.getName(), ex);
return false;
}
}

@Override
public int compareTo(ITaskExecutionRunnable other) {
if (other == null) {
Expand Down Expand Up @@ -159,6 +207,9 @@ public int compareTo(ITaskExecutionRunnable other) {

@Override
public String toString() {
if (taskInstance != null) {
return "TaskExecutionRunnable{" + "name=" + getName() + ", state=" + taskInstance.getState() + '}';
}
return "TaskExecutionRunnable{" + "name=" + getName() + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.AbstractLifecycleEvent;
Expand Down Expand Up @@ -170,7 +165,7 @@ public void failedEventAction(final IWorkflowExecutionRunnable workflowExecution
releaseTaskInstanceResourcesIfNeeded(taskExecutionRunnable);
persistentTaskInstanceFailedEventToDB(taskExecutionRunnable, taskFailedEvent);

if (taskExecutionRunnable.isTaskInstanceNeedRetry()) {
if (taskExecutionRunnable.isTaskInstanceCanRetry()) {
taskExecutionRunnable.getWorkflowEventBus().publish(TaskRetryLifecycleEvent.of(taskExecutionRunnable));
return;
}
Expand Down Expand Up @@ -227,44 +222,7 @@ protected void persistentTaskInstanceSuccessEventToDB(final ITaskExecutionRunnab
* <p> If the take-over fails, will generate a failover task-instance and mark the task instance status to {@link TaskExecutionStatus#NEED_FAULT_TOLERANCE}.
*/
protected void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable) {
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
throw new IllegalStateException("The task instance hasn't been initialized, cannot take over the task");
}
if (takeOverTask(taskExecutionRunnable)) {
log.info("Failover task success, the task {} has been taken-over", taskExecutionRunnable.getName());
return;
}
taskExecutionRunnable.initializeFailoverTaskInstance();
tryToDispatchTask(taskExecutionRunnable);
log.info("Failover task success, the task {} has been resubmitted.", taskExecutionRunnable.getName());
}

private boolean takeOverTask(final ITaskExecutionRunnable taskExecutionRunnable) {
if (!taskExecutionRunnable.isTaskInstanceInitialized()) {
log.debug("Task: {} doesn't initialized yet, cannot take over the task", taskExecutionRunnable.getName());
return false;
}
if (TaskTypeUtils.isLogicTask(taskExecutionRunnable.getTaskInstance().getTaskType())) {
return false;
}
if (StringUtils.isEmpty(taskExecutionRunnable.getTaskInstance().getHost())) {
log.debug("Task: {} host is empty, cannot take over the task", taskExecutionRunnable.getName());
return false;
}
try {
final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder()
.taskInstanceId(taskExecutionRunnable.getTaskInstance().getId())
.workflowHost(masterConfig.getMasterAddress())
.build();
final TakeOverTaskResponse takeOverTaskResponse = Clients
.withService(ITaskInstanceOperator.class)
.withHost(taskExecutionRunnable.getTaskInstance().getHost())
.takeOverTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over task: {} failed", taskExecutionRunnable.getName(), ex);
return false;
}
taskExecutionRunnable.failover();
}

protected void tryToDispatchTask(final ITaskExecutionRunnable taskExecutionRunnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,14 @@ public void retryEventAction(final IWorkflowExecutionRunnable workflowExecutionR
throwExceptionIfStateIsNotMatch(taskExecutionRunnable);
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
// check the retry times
if (!taskExecutionRunnable.isTaskInstanceNeedRetry()) {
if (!taskExecutionRunnable.isTaskInstanceCanRetry()) {
log.info("The task: {} cannot retry, because the retry times: {} is over the max retry times: {}",
taskInstance.getName(),
taskInstance.getRetryTimes(),
taskInstance.getMaxRetryTimes());
return;
}
taskExecutionRunnable.initializeRetryTaskInstance();
taskExecutionRunnable.getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(taskExecutionRunnable));
taskExecutionRunnable.retry();
}

@Override
Expand Down Expand Up @@ -117,7 +116,7 @@ public void pausedEventAction(final IWorkflowExecutionRunnable workflowExecution
// This case happen when the task is failure but the task is in delay retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
// killed.
if (taskExecutionRunnable.isTaskInstanceNeedRetry()
if (taskExecutionRunnable.isTaskInstanceCanRetry()
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
workflowExecutionGraph.markTaskExecutionRunnableChainPause(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
Expand All @@ -143,7 +142,7 @@ public void killedEventAction(final IWorkflowExecutionRunnable workflowExecution
// This case happen when the task is failure but the task is in delay retry queue.
// We don't remove the event in GlobalWorkflowDelayEventCoordinator the event should be dropped when the task is
// killed.
if (taskExecutionRunnable.isTaskInstanceNeedRetry()
if (taskExecutionRunnable.isTaskInstanceCanRetry()
&& workflowExecutionGraph.isTaskExecutionRunnableActive(taskExecutionRunnable)) {
workflowExecutionGraph.markTaskExecutionRunnableChainKill(taskExecutionRunnable);
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ private void generateNextBackfillCommand(final BackfillWorkflowCommandParam comm
.startNodes(commandParam.getStartNodes())
.failureStrategy(workflowInstance.getFailureStrategy())
.taskDependType(workflowInstance.getTaskDependType())
.execType(CommandType.COMPLEMENT_DATA)
.warningType(workflowInstance.getWarningType())
.warningGroupId(workflowInstance.getWarningGroupId())
.workflowInstancePriority(workflowInstance.getProcessInstancePriority())
Expand Down
Loading

0 comments on commit 0f4bce1

Please sign in to comment.