Skip to content

Commit

Permalink
Refactor Master
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Mar 5, 2024
1 parent e3bd263 commit af58c54
Show file tree
Hide file tree
Showing 75 changed files with 1,895 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.dolphinscheduler.server.master.dag;

public abstract class BasicDAG<E> implements DAG<E> {

private Map<String, DAGNode<E>> dagNodeMap;

@Override
public List<DAGNode<E>> getAllPostNodes(DAGNode<E> dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
}
DAGNode<E> node = dagNodeMap.get(nodeName);
List<DAGNode<E>> dagNodes = new ArrayList<>();
for (DAGEdge edge : node.getOutDegrees()) {
if (dagNodeMap.containsKey(edge.getToNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getToNodeName()));
}
}
return dagNodes;
}

@Override
public List<DAGNode<E>> getAllParentNodes(DAGNode<E> dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
}
DAGNode<E> node = dagNodeMap.get(nodeName);
List<DAGNode<E>> dagNodes = new ArrayList<>();
for (DAGEdge edge : node.getOutDegrees()) {
if (dagNodeMap.containsKey(edge.getFromNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getFromNodeName()));
}
}
return dagNodes;
}

@Override
public DAGNode<E> getDAGNode(String nodeName) {
return dagNodeMap.get(nodeName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.dolphinscheduler.server.master.dag;

import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Data;

/**
* The Directed Acyclic Graph class
*
* @param <E> type of the node content.
*/
public interface DAG<E> {

List<DAGNode<E>> getAllPostNodes(DAGNode<E> dagNode);

List<DAGNode<E>> getAllParentNodes(DAGNode<E> dagNode);

DAGNode<E> getDAGNode(String node);

/**
* The node of the DAG.
*
* @param <E> content type of the node.
*/
@Data
@AllArgsConstructor
class DAGNode<E> {

private String nodeName;
private E nodeContent;

private List<DAGEdge> inDegrees;
private List<DAGEdge> outDegrees;
}

/**
* The edge of the DAG.
*/
@Data
@AllArgsConstructor
class DAGEdge {

private String fromNodeName;
private String toNodeName;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.events.IEventRepository;
import org.apache.dolphinscheduler.server.master.events.TaskOperationEvent;
import org.apache.dolphinscheduler.server.master.events.TaskOperationType;

import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Builder
@AllArgsConstructor
public class DAGEngine implements IDAGEngine {

@Getter
private final IWorkflowExecutionDAG workflowExecutionDAG;

private final List<TaskTriggerConditionChecker> taskTriggerConditionCheckers;

private final ITaskExecutionRunnableFactory taskExecutionRunnableFactory;

private final IEventRepository eventRepository;

@Override
public void triggerNextTasks(String parentTaskNodeName) {
workflowExecutionDAG.getWorkflowDAG().getPostNodeNames(parentTaskNodeName).forEach(this::triggerTask);
}

@Override
@SneakyThrows
public void triggerTask(String taskName) {
for (TaskTriggerConditionChecker taskTriggerConditionChecker : taskTriggerConditionCheckers) {
if (!taskTriggerConditionChecker.taskCanTrigger(taskName)) {
return;
}
}
// todo: create Task ExecutionRunnable
TaskExecutionRunnable taskExecuteRunnable = workflowExecutionDAG.createTaskExecutionRunnable(taskName);
TaskInstance taskInstance = taskExecuteRunnable.getTaskExecutionContext().getTaskInstance();
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId())
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.DISPATCH).build();
eventRepository.storeEventToTail(taskOperationEvent);
}

@Override
public void pauseTask(Integer taskInstanceId) {
TaskExecutionRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId);
return;
}
TaskInstance taskInstance = taskExecutionRunnable.getTaskExecutionContext().getTaskInstance();
TaskOperationEvent taskOperationEvent =
TaskOperationEvent.builder().workflowInstanceId(taskInstance.getProcessInstanceId())
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.PAUSE).build();
eventRepository.storeEventToTail(taskOperationEvent);
}

@Override
public void killTask(Integer taskInstanceId) {
TaskExecutionRunnable taskExecutionRunnable = workflowExecutionDAG.getTaskExecutionRunnableById(taskInstanceId);
if (taskExecutionRunnable == null) {
log.error("Cannot find the ITaskExecutionRunnable for: {}", taskInstanceId);
return;
}

TaskInstance taskInstance = taskExecutionRunnable.getTaskExecutionContext().getTaskInstance();
TaskOperationEvent taskOperationEvent = TaskOperationEvent.builder().workflowInstanceId(taskInstance.getId())
.taskInstanceId(taskInstance.getId()).taskOperationType(TaskOperationType.KILL).build();
eventRepository.storeEventToTail(taskOperationEvent);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.server.master.events.IEventRepository;

import java.util.List;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DAGEngineFactory implements IDAGEngineFactory {

@Autowired
private IWorkflowExecutionDAGFactory workflowExecutionDAGFactory;

@Autowired
private List<TaskTriggerConditionChecker> taskTriggerConditionCheckers;

@Autowired
private ITaskExecutionRunnableFactory taskExecutionRunnableFactory;

@Override
public IDAGEngine createDAGEngine(WorkflowExecutionContext workflowExecutionContext,
IEventRepository eventRepository) {
IWorkflowExecutionDAG workflowExecutionDAG =
workflowExecutionDAGFactory.createWorkflowExecutionDAG(workflowExecutionContext);
return DAGEngine.builder()
.workflowExecutionDAG(workflowExecutionDAG)
.taskExecutionRunnableFactory(taskExecutionRunnableFactory)
.taskTriggerConditionCheckers(taskTriggerConditionCheckers)
.eventRepository(eventRepository)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.dolphinscheduler.server.master.dag;

/**
* The IDAGEngine is responsible for triggering, killing, pausing, and finalizing task in {@link IWorkflowExecutionDAG}.
* <p>All DAG operation should directly use the method in IDAGEngine, new {@link IWorkflowExecutionDAG} should be triggered by new IDAGEngine.
*/
public interface IDAGEngine {

/**
* Trigger the tasks which are post of the given task.
* <P> If there are no task after the given taskNode, will try to finish the WorkflowExecutionRunnable.
* <p> If the
*
* @param parentTaskNodeName the parent task name
*/
void triggerNextTasks(String parentTaskNodeName);

/**
* Trigger the given task
*
* @param taskName task name
*/
void triggerTask(String taskName);

/**
* Pause the given task.
*/
void pauseTask(Integer taskId);

/**
* Kill the given task.
*/
void killTask(Integer taskId);

/**
* Get {@link IWorkflowExecutionDAG} belong to the Engine.
*
* @return workflow execution DAG.
*/
IWorkflowExecutionDAG getWorkflowExecutionDAG();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.server.master.events.IEventRepository;

public interface IDAGEngineFactory {

IDAGEngine createDAGEngine(WorkflowExecutionContext workflowExecutionContext, IEventRepository eventRepository);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.server.master.events.IEventRepository;

public interface IEventRepositoryFactory {

IEventRepository createEventRepository();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.server.master.events.IEventRepository;

public interface IEventfulExecutionRunnable {

IEventRepository getEventRepository();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.apache.dolphinscheduler.server.master.dag;

public interface ITaskExecutionRunnable {

void dispatch();

void kill();

void pause();

TaskExecutionContext getTaskExecutionContext();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.dolphinscheduler.server.master.dag;

public interface ITaskExecutionRunnableFactory {

TaskExecutionRunnable createTaskExecutionRunnable(TaskExecutionContext taskExecutionContext);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.dao.entity.TaskDefinition;

import java.util.List;

public interface IWorkflowDAG extends DAG<TaskDefinition> {

/**
* Return the post task name of given parentTaskName.
*
* @param parentTaskName parent task name, can be null.
* @return post task name list, sort by priority.
*/
List<String> getPostNodeNames(String parentTaskName);

/**
* Get the pre task name of given taskName
*
* @param taskName task name can be null.
* @return parent task name list.
*/
List<String> getParentNodeNames(String taskName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.dolphinscheduler.server.master.dag;

public interface IWorkflowDAGFactory {

IWorkflowDAG buildWorkflowDAG(WorkflowIdentify workflowIdentify);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.apache.dolphinscheduler.server.master.dag;

import org.apache.dolphinscheduler.server.master.exception.WorkflowExecuteRunnableNotFoundException;

/**
* The WorkflowEngine is responsible for starting, stopping, pausing, and finalizing workflows.
*/
public interface IWorkflowEngine {

/**
* Start the workflow engine.
*/
void start();

/**
* Trigger a workflow to start.
*
* @param workflowExecuteRunnable the workflow to start
*/
void triggerWorkflow(IWorkflowExecutionRunnable workflowExecuteRunnable);

/**
* Pause a workflow instance.
*
* @param workflowInstanceId the ID of the workflow to pause
* @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found
*/
void pauseWorkflow(Integer workflowInstanceId);

/**
* Kill a workflow instance.
*
* @param workflowInstanceId the ID of the workflow to stop
* @throws WorkflowExecuteRunnableNotFoundException if the workflow is not found
*/
void killWorkflow(Integer workflowInstanceId);

/**
* Finalize a workflow instance. Once a workflow has been finalized, then it cannot receive new operation, and will be removed from memory.
*
* @param workflowInstanceId the ID of the workflow to finalize
*/
void finalizeWorkflow(Integer workflowInstanceId);

/**
* Stop the workflow engine.
*/
void stop();
}
Loading

0 comments on commit af58c54

Please sign in to comment.