diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index b558d6cf3745..3759f2209c2b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -91,7 +91,7 @@ import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.commons.collections4.CollectionUtils; @@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { /** * The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id. */ - private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue = + new StandByTaskInstancePriorityQueue(); /** * wait to retry taskInstance map, taskCode as key, taskInstance as value @@ -249,7 +250,7 @@ public WorkflowExecuteRunnable( this.taskInstanceDao = taskInstanceDao; this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; this.listenerEventAlertManager = listenerEventAlertManager; - TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); + TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size); } /** @@ -1430,7 +1431,7 @@ && tryToTakeOverTaskInstance(existTaskInstance)) { // if previous node success , post node submit for (TaskInstance task : taskInstances) { - if (readyToSubmitTaskQueue.contains(task)) { + if (standByTaskInstancePriorityQueue.contains(task)) { log.warn("Task is already at submit queue, taskInstanceName: {}", task.getName()); continue; } @@ -1665,7 +1666,7 @@ private boolean processFailed() { return true; } if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { - return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0 + return standByTaskInstancePriorityQueue.size() == 0 && taskExecuteRunnableMap.size() == 0 && waitToRetryTaskInstanceMap.size() == 0; } } @@ -1688,7 +1689,7 @@ private WorkflowExecutionStatus processReadyPause() { List pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd() - || readyToSubmitTaskQueue.size() > 0) { + || standByTaskInstancePriorityQueue.size() > 0) { return WorkflowExecutionStatus.PAUSE; } else { return WorkflowExecutionStatus.SUCCESS; @@ -1711,8 +1712,8 @@ private WorkflowExecutionStatus processReadyBlock() { } } } - if (readyToSubmitTaskQueue.size() > 0) { - for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) { + if (standByTaskInstancePriorityQueue.size() > 0) { + for (Iterator iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) { iter.next().setState(TaskExecutionStatus.PAUSE); } } @@ -1773,7 +1774,7 @@ private WorkflowExecutionStatus getProcessInstanceState(ProcessInstance instance // success if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) { List killTasks = getCompleteTaskByState(TaskExecutionStatus.KILL); - if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { + if (standByTaskInstancePriorityQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { // tasks currently pending submission, no retries, indicating that depend is waiting to complete return WorkflowExecutionStatus.RUNNING_EXECUTION; } else if (CollectionUtils.isNotEmpty(killTasks)) { @@ -1878,7 +1879,7 @@ private DependResult getDependResultForTask(TaskInstance taskInstance) { * @param taskInstance task instance */ public void addTaskToStandByList(TaskInstance taskInstance) { - if (readyToSubmitTaskQueue.contains(taskInstance)) { + if (standByTaskInstancePriorityQueue.contains(taskInstance)) { log.warn("Task already exists in ready submit queue, no need to add again, task code:{}", taskInstance.getTaskCode()); return; @@ -1888,7 +1889,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) { taskInstance.getId(), taskInstance.getTaskCode()); TaskMetrics.incTaskInstanceByState("submit"); - readyToSubmitTaskQueue.put(taskInstance); + standByTaskInstancePriorityQueue.put(taskInstance); } /** @@ -1897,7 +1898,7 @@ public void addTaskToStandByList(TaskInstance taskInstance) { * @param taskInstance task instance */ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) { - return readyToSubmitTaskQueue.remove(taskInstance); + return standByTaskInstancePriorityQueue.remove(taskInstance); } /** @@ -1906,7 +1907,7 @@ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) { * @return Boolean whether has retry task in standby */ private boolean hasRetryTaskInStandBy() { - for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) { + for (Iterator iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) { if (iter.next().getState().isFailure()) { return true; } @@ -1923,8 +1924,8 @@ public void killAllTasks() { workflowInstance.getId(), taskExecuteRunnableMap.size()); - if (readyToSubmitTaskQueue.size() > 0) { - readyToSubmitTaskQueue.clear(); + if (standByTaskInstancePriorityQueue.size() > 0) { + standByTaskInstancePriorityQueue.clear(); } for (long taskCode : taskExecuteRunnableMap.keySet()) { @@ -1965,7 +1966,7 @@ public boolean workFlowFinish() { public void submitStandByTask() throws StateEventHandleException { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); TaskInstance task; - while ((task = readyToSubmitTaskQueue.peek()) != null) { + while ((task = standByTaskInstancePriorityQueue.peek()) != null) { // stop tasks which is retrying if forced success happens if (task.getId() != null && task.taskCanRetry()) { TaskInstance retryTask = taskInstanceDao.queryById(task.getId()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java similarity index 79% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java index 5bd89ffad539..c11c4fe5a9e3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.queue; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; @@ -35,7 +34,7 @@ * Task instances priority queue implementation * All the task instances are in the same process instance. */ -public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { +public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue { /** * queue size @@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private final PriorityQueue queue = + new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInstancePriorityComparator()); private final Set taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>()); /** @@ -163,24 +163,25 @@ private String getTaskInstanceIdentify(TaskInstance taskInstance) { } /** - * TaskInfoComparator + * This comparator is used to sort task instances in the standby queue. + * If the TaskInstance is in the same taskGroup, then we will sort the TaskInstance by {@link TaskInstance#getTaskGroupPriority()} in the taskGroup. + * Otherwise, we will sort the TaskInstance by {@link TaskInstance#getTaskInstancePriority()} in the workflow. */ - private static class TaskInfoComparator implements Comparator { - - /** - * compare o1 o2 - * - * @param o1 o1 - * @param o2 o2 - * @return compare result - */ + private static class TaskInstancePriorityComparator implements Comparator { + @Override public int compare(TaskInstance o1, TaskInstance o2) { - if (o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) { - // larger number, higher priority - return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority()); + int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority()); + int taskInstancePriorityInWorkflow = + Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode()); + + if (o1.getTaskGroupId() == o2.getTaskGroupId()) { + // If at the same taskGroup + if (taskPriorityInTaskGroup != 0) { + return taskPriorityInTaskGroup; + } } - return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority()); + return taskInstancePriorityInWorkflow; } } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java similarity index 84% rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java index a430ab3c950f..80a25139299b 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java @@ -26,11 +26,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class PeerTaskInstancePriorityQueueTest { +public class StandByTaskInstancePriorityQueueTest { @Test public void put() throws TaskPriorityQueueException { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceHigPriority); @@ -42,7 +42,7 @@ public void put() throws TaskPriorityQueueException { @Test public void take() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.take(); Assertions.assertTrue(queue.size() < peekBeforeLength); @@ -50,7 +50,7 @@ public void take() throws Exception { @Test public void poll() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); Assertions.assertThrows(TaskPriorityQueueException.class, () -> { queue.poll(1000, TimeUnit.MILLISECONDS); }); @@ -58,14 +58,15 @@ public void poll() throws Exception { @Test public void peek() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); Assertions.assertEquals(peekBeforeLength, queue.size()); } @Test public void peekTaskGroupPriority() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1); queue.put(taskInstanceMediumPriority); @@ -80,7 +81,7 @@ public void peekTaskGroupPriority() throws Exception { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "medium"); + Assertions.assertEquals("medium", taskInstance.getName()); taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2); @@ -88,7 +89,7 @@ public void peekTaskGroupPriority() throws Exception { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "high"); + Assertions.assertEquals("medium", taskInstance.getName()); taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); @@ -96,7 +97,7 @@ public void peekTaskGroupPriority() throws Exception { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "high"); + Assertions.assertEquals("high", taskInstance.getName()); } @@ -107,7 +108,7 @@ public void size() throws Exception { @Test public void contains() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); Assertions.assertTrue(queue.contains(taskInstanceMediumPriority)); @@ -117,8 +118,8 @@ public void contains() throws Exception { } @Test - public void remove() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + public void remove() { + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); int peekBeforeLength = queue.size(); @@ -133,8 +134,8 @@ public void remove() throws Exception { * @return queue * @throws Exception */ - private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); taskInstanceHigPriority.setTaskGroupPriority(3);