Skip to content

Commit

Permalink
Refactor Master
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Mar 9, 2024
1 parent e984300 commit 1f3d676
Show file tree
Hide file tree
Showing 97 changed files with 4,626 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

Expand All @@ -44,11 +45,10 @@
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Strings;

/**
* process instance
*/
@NoArgsConstructor
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("t_ds_process_instance")
public class ProcessInstance {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public interface ProcessTaskRelationLogDao extends IDao<ProcessTaskRelationLog>

List<ProcessTaskRelationLog> queryByWorkflowDefinitionCode(long workflowDefinitionCode);

List<ProcessTaskRelationLog> queryByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode,
int workflowDefinitionVerison);

void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public List<ProcessTaskRelationLog> queryByWorkflowDefinitionCode(long workflowD
return mybatisMapper.queryByProcessCode(workflowDefinitionCode);
}

@Override
public List<ProcessTaskRelationLog> queryByWorkflowDefinitionCodeAndVersion(long workflowDefinitionCode,
int workflowDefinitionVerison) {
return mybatisMapper.queryByProcessCodeAndVersion(workflowDefinitionCode, workflowDefinitionVerison);
}

@Override
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
mybatisMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.dag;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import lombok.experimental.SuperBuilder;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@SuperBuilder
public abstract class BasicDAG<E> implements DAG<E> {

protected Map<String, DAGNode<E>> dagNodeMap;

@Override
public List<DAGNode<E>> getDirectPostNodes(DAGNode<E> dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
}
DAGNode<E> node = dagNodeMap.get(nodeName);
List<DAGNode<E>> dagNodes = new ArrayList<>();
for (DAGEdge edge : node.getOutDegrees()) {
if (dagNodeMap.containsKey(edge.getToNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getToNodeName()));
}
}
return dagNodes;
}

@Override
public List<DAGNode<E>> getDirectPreNodes(DAGNode<E> dagNode) {
final String nodeName = dagNode.getNodeName();
if (!dagNodeMap.containsKey(nodeName)) {
return Collections.emptyList();
}
DAGNode<E> node = dagNodeMap.get(nodeName);
List<DAGNode<E>> dagNodes = new ArrayList<>();
for (DAGEdge edge : node.getInDegrees()) {
if (dagNodeMap.containsKey(edge.getFromNodeName())) {
dagNodes.add(dagNodeMap.get(edge.getFromNodeName()));
}
}
return dagNodes;
}

@Override
public DAGNode<E> getDAGNode(String nodeName) {
return dagNodeMap.get(nodeName);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.dag;

import java.util.List;
import java.util.stream.Collectors;

/**
* The Directed Acyclic Graph class.
* <p>
* The DAG is a directed graph, which contains the nodes and the edges, the nodeName is the unique identifier of the node.
* The nodes are the tasks, the edges are the dependencies between the tasks.
* The DAG is acyclic, which means there is no cycle in the graph.
* The DAG is a directed graph, which means the edges have direction.
*
* @param <E> type of the node content.
*/
public interface DAG<E> {

/**
* Get the direct post node of given dagNode, if the dagNode is null, return the nodes which doesn't have inDegrees.
* e.g. The DAG is:
* <pre>
* {@code
* 1 -> 2 -> 3
* 4 -> 5
* 6
* }
* </pre>
* <li> The post node of 1 is 2.
* <li> The post node of 3 is empty.
* <li> The post node of null is 1,4,6.
*
* @param dagNode the node of the DAG, can be null.
* @return post node list, sort by priority.
*/
List<DAGNode<E>> getDirectPostNodes(DAGNode<E> dagNode);

/**
* Same with {@link #getDirectPostNodes(DAGNode)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have inDegrees. Otherwise, return the post nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
*
* @param dagNodeName task name, can be null.
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<DAGNode<E>> getDirectPostNodes(String dagNodeName) {
DAGNode<E> dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(dagNode);
}

/**
* Same with {@link #getDirectPostNodes(String)}. Return the post node names.
*
* @param dagNodeName task name, can be null.
* @return post task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<String> getDirectPostNodeNames(String dagNodeName) {
DAGNode<E> dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPostNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList());
}

/**
* Get the direct pre node of given dagNode, if the dagNode is null, return the nodes which doesn't have outDegrees.
* e.g. The DAG is:
* <pre>
* {@code
* 1 -> 2 -> 3
* 4 -> 5
* 6
* }
* </pre>
* <li> The pre node of 1 is empty.
* <li> The pre node of 3 is 2.
* <li> The pre node of null is 3,5,6.
*
* @param dagNode the node of the DAG, can be null.
* @return pre node list, sort by priority.
*/
List<DAGNode<E>> getDirectPreNodes(DAGNode<E> dagNode);

/**
* Same with {@link #getDirectPreNodes(DAGNode)}.
* <p>
* If the dagNodeName is null, return the nodes which doesn't have outDegrees. Otherwise, return the pre nodes of
* the given dagNodeName. If the dagNodeName is not null and cannot find the node in DAG, throw IllegalArgumentException.
*
* @param dagNodeName task name, can be null.
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<DAGNode<E>> getDirectPreNodes(String dagNodeName) {
DAGNode<E> dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(dagNode);
}

/**
* Same with {@link #getDirectPreNodes(String)}. Return the pre node names.
*
* @param dagNodeName task name, can be null.
* @return pre task name list, sort by priority.
* @throws IllegalArgumentException if the dagNodeName is not null and cannot find the node in DAG.
*/
default List<String> getDirectPreNodeNames(String dagNodeName) {
DAGNode<E> dagNode = getDAGNode(dagNodeName);
if (dagNodeName != null && dagNode == null) {
throw new IllegalArgumentException("Cannot find the Node: " + dagNodeName + " in DAG");
}
return getDirectPreNodes(dagNode).stream().map(DAGNode::getNodeName).collect(Collectors.toList());
}

/**
* Get the node of the DAG by the node name.
*
* @param nodeName the node name.
* @return the node of the DAG, return null if cannot find the node.
*/
DAGNode<E> getDAGNode(String nodeName);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.dag;

import lombok.Builder;
import lombok.Data;

/**
* The edge of the DAG.
* <p>
* The edge contains the fromNodeName and the toNodeName, the fromNodeName is the node name of the from node, the toNodeName is the node name of the to node.
* <p>
* The formNodeName can be null, which means the edge is from the start node of the DAG.
* The toNodeName can be null, which means the edge is to the end node of the DAG.
* The fromNodeName and the toNodeName cannot be null at the same time.
*/
@Data
@Builder
public class DAGEdge {

private String fromNodeName;
private String toNodeName;

public DAGEdge(String fromNodeName, String toNodeName) {
if (fromNodeName == null && toNodeName == null) {
throw new IllegalArgumentException("fromNodeName and toNodeName cannot be null at the same time"
+ "fromNodeName: " + fromNodeName + ", toNodeName: " + toNodeName);
}
if (fromNodeName != null && fromNodeName.equals(toNodeName)) {
throw new IllegalArgumentException("fromNodeName and toNodeName cannot be the same"
+ "fromNodeName: " + fromNodeName + ", toNodeName: " + toNodeName);
}
this.fromNodeName = fromNodeName;
this.toNodeName = toNodeName;
}
}
Loading

0 comments on commit 1f3d676

Please sign in to comment.