Skip to content

Commit

Permalink
TaskGroupPriority only compare When TaskGroup is same
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jan 17, 2024
1 parent e1fcd4e commit c9a54e9
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;

import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
/**
* The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id.
*/
private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue =
new StandByTaskInstancePriorityQueue();

/**
* wait to retry taskInstance map, taskCode as key, taskInstance as value
Expand Down Expand Up @@ -249,7 +250,7 @@ public WorkflowExecuteRunnable(
this.taskInstanceDao = taskInstanceDao;
this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory;
this.listenerEventAlertManager = listenerEventAlertManager;
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size);
}

/**
Expand Down Expand Up @@ -1430,7 +1431,7 @@ && tryToTakeOverTaskInstance(existTaskInstance)) {
// if previous node success , post node submit
for (TaskInstance task : taskInstances) {

if (readyToSubmitTaskQueue.contains(task)) {
if (standByTaskInstancePriorityQueue.contains(task)) {
log.warn("Task is already at submit queue, taskInstanceName: {}", task.getName());
continue;
}
Expand Down Expand Up @@ -1665,7 +1666,7 @@ private boolean processFailed() {
return true;
}
if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
return standByTaskInstancePriorityQueue.size() == 0 && taskExecuteRunnableMap.size() == 0
&& waitToRetryTaskInstanceMap.size() == 0;
}
}
Expand All @@ -1688,7 +1689,7 @@ private WorkflowExecutionStatus processReadyPause() {

List<TaskInstance> pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd()
|| readyToSubmitTaskQueue.size() > 0) {
|| standByTaskInstancePriorityQueue.size() > 0) {
return WorkflowExecutionStatus.PAUSE;
} else {
return WorkflowExecutionStatus.SUCCESS;
Expand All @@ -1711,8 +1712,8 @@ private WorkflowExecutionStatus processReadyBlock() {
}
}
}
if (readyToSubmitTaskQueue.size() > 0) {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
if (standByTaskInstancePriorityQueue.size() > 0) {
for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
iter.next().setState(TaskExecutionStatus.PAUSE);
}
}
Expand Down Expand Up @@ -1773,7 +1774,7 @@ private WorkflowExecutionStatus getProcessInstanceState(ProcessInstance instance
// success
if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(TaskExecutionStatus.KILL);
if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
if (standByTaskInstancePriorityQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) {
// tasks currently pending submission, no retries, indicating that depend is waiting to complete
return WorkflowExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
Expand Down Expand Up @@ -1878,7 +1879,7 @@ private DependResult getDependResultForTask(TaskInstance taskInstance) {
* @param taskInstance task instance
*/
public void addTaskToStandByList(TaskInstance taskInstance) {
if (readyToSubmitTaskQueue.contains(taskInstance)) {
if (standByTaskInstancePriorityQueue.contains(taskInstance)) {
log.warn("Task already exists in ready submit queue, no need to add again, task code:{}",
taskInstance.getTaskCode());
return;
Expand All @@ -1888,7 +1889,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
taskInstance.getId(),
taskInstance.getTaskCode());
TaskMetrics.incTaskInstanceByState("submit");
readyToSubmitTaskQueue.put(taskInstance);
standByTaskInstancePriorityQueue.put(taskInstance);
}

/**
Expand All @@ -1897,7 +1898,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
* @param taskInstance task instance
*/
private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
return readyToSubmitTaskQueue.remove(taskInstance);
return standByTaskInstancePriorityQueue.remove(taskInstance);
}

/**
Expand All @@ -1906,7 +1907,7 @@ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
* @return Boolean whether has retry task in standby
*/
private boolean hasRetryTaskInStandBy() {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
for (Iterator<TaskInstance> iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) {
if (iter.next().getState().isFailure()) {
return true;
}
Expand All @@ -1923,8 +1924,8 @@ public void killAllTasks() {
workflowInstance.getId(),
taskExecuteRunnableMap.size());

if (readyToSubmitTaskQueue.size() > 0) {
readyToSubmitTaskQueue.clear();
if (standByTaskInstancePriorityQueue.size() > 0) {
standByTaskInstancePriorityQueue.clear();
}

for (long taskCode : taskExecuteRunnableMap.keySet()) {
Expand Down Expand Up @@ -1965,7 +1966,7 @@ public boolean workFlowFinish() {
public void submitStandByTask() throws StateEventHandleException {
ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance();
TaskInstance task;
while ((task = readyToSubmitTaskQueue.peek()) != null) {
while ((task = standByTaskInstancePriorityQueue.peek()) != null) {
// stop tasks which is retrying if forced success happens
if (task.getId() != null && task.taskCanRetry()) {
TaskInstance retryTask = taskInstanceDao.queryById(task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.service.queue;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;

Expand All @@ -35,7 +34,7 @@
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {

/**
* queue size
Expand All @@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/**
* queue
*/
private final PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
private final PriorityQueue<TaskInstance> queue =
new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInstancePriorityComparator());
private final Set<String> taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>());

/**
Expand Down Expand Up @@ -163,24 +163,25 @@ private String getTaskInstanceIdentify(TaskInstance taskInstance) {
}

/**
* TaskInfoComparator
* This comparator is used to sort task instances in the standby queue.
* If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup.
* Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow.
*/
private static class TaskInfoComparator implements Comparator<TaskInstance> {

/**
* compare o1 o2
*
* @param o1 o1
* @param o2 o2
* @return compare result
*/
private static class TaskInstancePriorityComparator implements Comparator<TaskInstance> {

@Override
public int compare(TaskInstance o1, TaskInstance o2) {
if (o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) {
// larger number, higher priority
return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority());
int taskInstancePriorityInWorkflow =
Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode());

if (o1.getTaskGroupId() == o2.getTaskGroupId()) {
// If at the same taskGroup
if (taskPriorityInTaskGroup != 0) {
return taskPriorityInTaskGroup;
}
}
return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
return taskInstancePriorityInWorkflow;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PeerTaskInstancePriorityQueueTest {
public class StandByTaskInstancePriorityQueueTest {

@Test
public void put() throws TaskPriorityQueueException {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceHigPriority);
Expand All @@ -42,30 +42,31 @@ public void put() throws TaskPriorityQueueException {

@Test
public void take() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assertions.assertTrue(queue.size() < peekBeforeLength);
}

@Test
public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
Assertions.assertThrows(TaskPriorityQueueException.class, () -> {
queue.poll(1000, TimeUnit.MILLISECONDS);
});
}

@Test
public void peek() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
Assertions.assertEquals(peekBeforeLength, queue.size());
}

@Test
public void peekTaskGroupPriority() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();

TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
queue.put(taskInstanceMediumPriority);
Expand All @@ -80,23 +81,23 @@ public void peekTaskGroupPriority() throws Exception {
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals(taskInstance.getName(), "medium");
Assertions.assertEquals("medium", taskInstance.getName());

taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals(taskInstance.getName(), "high");
Assertions.assertEquals("medium", taskInstance.getName());

taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assertions.assertEquals(taskInstance.getName(), "high");
Assertions.assertEquals("high", taskInstance.getName());

}

Expand All @@ -107,7 +108,7 @@ public void size() throws Exception {

@Test
public void contains() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
Assertions.assertTrue(queue.contains(taskInstanceMediumPriority));
Expand All @@ -117,8 +118,8 @@ public void contains() throws Exception {
}

@Test
public void remove() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
public void remove() {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
int peekBeforeLength = queue.size();
Expand All @@ -133,8 +134,8 @@ public void remove() throws Exception {
* @return queue
* @throws Exception
*/
private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
taskInstanceHigPriority.setTaskGroupPriority(3);
Expand Down

0 comments on commit c9a54e9

Please sign in to comment.