Skip to content

Commit

Permalink
[Improvement] Move delay calculation to Master (apache#15278)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Dec 6, 2023
1 parent 43f5f24 commit 2119e41
Show file tree
Hide file tree
Showing 65 changed files with 403 additions and 579 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,14 @@

package org.apache.dolphinscheduler.server.master.rpc;

import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;

import java.util.concurrent.TimeUnit;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,10 +38,10 @@ public class LogicITaskInstanceDispatchOperationFunction
ITaskInstanceOperationFunction<LogicTaskDispatchRequest, LogicTaskDispatchResponse> {

@Autowired
private MasterTaskExecuteRunnableFactoryBuilder masterTaskExecuteRunnableFactoryBuilder;
private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Override
public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) {
Expand All @@ -63,34 +59,17 @@ public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRe

MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);

int delayTime = taskExecutionContext.getDelayTime();
if (delayTime > 0) {
// todo: calculate the delay in master dispatcher then we don't need to use a queue to store the task
final long remainTime =
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
TimeUnit.SECONDS.toMillis(delayTime));
if (remainTime > 0) {
log.info(
"Current taskInstance: {} is choosing delay execution, delay time: {}/ms, remainTime: {}/ms",
taskExecutionContext.getTaskName(),
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
// todo: send delay execution message
return LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}
}
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterTaskExecuteRunnableFactoryBuilder
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())
.createWorkerTaskExecuteRunnable(taskExecutionContext);
if (masterDelayTaskExecuteRunnableDelayQueue
.submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) {
MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
.createMasterTaskExecutor(taskExecutionContext);
if (globalMasterTaskExecuteRunnableQueue
.submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
return LogicTaskDispatchResponse.success(taskInstanceId);
} else {
log.error(
"Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full",
taskInstanceName, masterDelayTaskExecuteRunnableDelayQueue.size());
taskInstanceName, globalMasterTaskExecuteRunnableQueue.size());
return LogicTaskDispatchResponse.failed(taskInstanceId,
"MasterDelayTaskExecuteRunnableDelayQueue is full");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,32 +38,32 @@ public class LogicITaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<LogicTaskKillRequest, LogicTaskKillResponse> {

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Override
public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) {
final int taskInstanceId = taskKillRequest.getTaskInstanceId();
try {
LogUtils.setTaskInstanceIdMDC(taskKillRequest.getTaskInstanceId());
log.info("Received killLogicTask request: {}", taskKillRequest);
final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
if (masterTaskExecuteRunnable == null) {
final MasterTaskExecutor masterTaskExecutor =
MasterTaskExecutorHolder.getMasterTaskExecutor(taskInstanceId);
if (masterTaskExecutor == null) {
log.error("Cannot find the MasterTaskExecuteRunnable, this task may already been killed");
return LogicTaskKillResponse.fail("Cannot find the MasterTaskExecuteRunnable");
}
try {
masterTaskExecuteRunnable.cancelTask();
masterDelayTaskExecuteRunnableDelayQueue
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
masterTaskExecutor.cancelTask();
globalMasterTaskExecuteRunnableQueue
.removeMasterTaskExecuteRunnable(masterTaskExecutor);
return LogicTaskKillResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
return LogicTaskKillResponse.fail("Cancel MasterTaskExecuteRunnable failed: " + e.getMessage());
} finally {
// todo: If cancel failed, we cannot remove the context?
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId);
}
} finally {
LogUtils.removeTaskInstanceIdMDC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -39,16 +39,16 @@ public class LogicITaskInstancePauseOperationFunction
public LogicTaskPauseResponse operate(LogicTaskPauseRequest taskPauseRequest) {
try {
LogUtils.setTaskInstanceIdMDC(taskPauseRequest.getTaskInstanceId());
final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
if (masterTaskExecuteRunnable == null) {
final MasterTaskExecutor masterTaskExecutor =
MasterTaskExecutorHolder.getMasterTaskExecutor(taskPauseRequest.getTaskInstanceId());
if (masterTaskExecutor == null) {
log.info("Cannot find the MasterTaskExecuteRunnable");
return LogicTaskPauseResponse.fail("Cannot find the MasterTaskExecuteRunnable");
}
final TaskExecutionContext taskExecutionContext = masterTaskExecuteRunnable.getTaskExecutionContext();
final TaskExecutionContext taskExecutionContext = masterTaskExecutor.getTaskExecutionContext();
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
masterTaskExecuteRunnable.pauseTask();
masterTaskExecutor.pauseTask();
return LogicTaskPauseResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Pause MasterTaskExecuteRunnable failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;

import java.util.Date;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner;

import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;

public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable {

protected final ProcessInstance workflowInstance;
protected final TaskInstance taskInstance;
protected final TaskExecutionContext taskExecutionContext;

public BaseTaskExecuteRunnable(ProcessInstance workflowInstance,
TaskInstance taskInstance,
TaskExecutionContext taskExecutionContext) {
this.taskInstance = checkNotNull(taskInstance);
this.workflowInstance = checkNotNull(workflowInstance);
this.taskExecutionContext = checkNotNull(taskExecutionContext);
}

@Override
public ProcessInstance getWorkflowInstance() {
return workflowInstance;
}

@Override
public TaskInstance getTaskInstance() {
return taskInstance;
}

@Override
public TaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner.execute;
package org.apache.dolphinscheduler.server.master.runner;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -24,7 +24,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;

public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable {
public class DefaultTaskExecuteRunnable extends PriorityDelayTaskExecuteRunnable {

private final TaskExecuteRunnableOperatorManager taskExecuteRunnableOperatorManager;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

/**
*
*/
@Component
public class GlobalMasterTaskExecuteRunnableQueue {

private final BlockingQueue<MasterTaskExecutor> masterTaskExecutorBlockingQueue =
new LinkedBlockingQueue<>();

public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) {
return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
}

public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws InterruptedException {
return masterTaskExecutorBlockingQueue.take();
}

public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) {
return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
}

public int size() {
return masterTaskExecutorBlockingQueue.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -31,17 +31,17 @@

@Slf4j
@Component
public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonThread implements AutoCloseable {
public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable {

@Autowired
private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue;
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;

@Autowired
private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool;
private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;

private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

public MasterDelayTaskExecuteRunnableDelayQueueLooper() {
public GlobalMasterTaskExecuteRunnableQueueLooper() {
super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
}

Expand All @@ -53,18 +53,18 @@ public synchronized void start() {
}
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
super.start();
masterTaskExecuteRunnableThreadPool.start();
masterTaskExecutorThreadPool.start();
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
}

@Override
public void run() {
while (RUNNING_FLAG.get()) {
try {
final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable =
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
final MasterTaskExecutor masterTaskExecutor =
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.DelayQueue;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -29,7 +27,7 @@
@Component
public class GlobalTaskDispatchWaitingQueue {

private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue = new PriorityBlockingQueue<>();
private final DelayQueue<DefaultTaskExecuteRunnable> queue = new DelayQueue<>();

public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
Expand Down
Loading

0 comments on commit 2119e41

Please sign in to comment.