Skip to content

Commit

Permalink
Refactor Master threadPool
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Feb 20, 2024
1 parent 8a35e8b commit 80446ae
Show file tree
Hide file tree
Showing 118 changed files with 1,878 additions and 794 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,17 @@

package org.apache.dolphinscheduler.server.master.cache;

import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;

import java.util.Collection;

import lombok.NonNull;

/**
* cache of process instance id and WorkflowExecuteThread
*/
public interface ProcessInstanceExecCacheManager {
public interface IWorkflowExecuteRunnableRepository<W> {

/**
* get WorkflowExecuteThread by process instance id
* Get IWorkflowExecutionRunnable by workflowInstance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
* @param processInstanceId workflowInstanceId
* @return IWorkflowExecutionRunnable
*/
WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);
W getByProcessInstanceId(int processInstanceId);

/**
* judge the process instance does it exist
Expand All @@ -57,14 +50,14 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
void cache(int processInstanceId, W workflowExecuteThread);

/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteRunnable> getAll();
Collection<W> getAll();

void clearCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,33 @@

package org.apache.dolphinscheduler.server.master.cache.impl;

import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecutionRunnable;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import lombok.NonNull;

import org.springframework.stereotype.Component;

import com.google.common.collect.ImmutableList;

/**
* cache of process instance id and WorkflowExecuteThread
*/
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
public class WorkflowExecuteRunnableRepositoryImpl
implements
IWorkflowExecuteRunnableRepository<IWorkflowExecutionRunnable> {

private final ConcurrentHashMap<Integer, WorkflowExecuteRunnable> processInstanceExecMaps =
new ConcurrentHashMap<>();
private final Map<Integer, IWorkflowExecutionRunnable> processInstanceExecMaps = new ConcurrentHashMap<>();

@PostConstruct
public void registerMetrics() {
public WorkflowExecuteRunnableRepositoryImpl() {
ProcessInstanceMetrics.registerProcessInstanceRunningGauge(processInstanceExecMaps::size);
}

@Override
public WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId) {
public IWorkflowExecutionRunnable getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId);
}

Expand All @@ -62,12 +58,12 @@ public void removeByProcessInstanceId(int processInstanceId) {
}

@Override
public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) {
public void cache(int processInstanceId, @NonNull IWorkflowExecutionRunnable workflowExecuteThread) {
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
}

@Override
public Collection<WorkflowExecuteRunnable> getAll() {
public Collection<IWorkflowExecutionRunnable> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -59,7 +59,7 @@ public class MasterConfig implements Validator {
/**
* todo: We may need to split the process/task into different thread size.
* The thread number used to handle processInstance and task event.
* Will create two thread poll to execute {@link WorkflowExecuteRunnable} and {@link TaskExecuteRunnable}.
* Will create two thread poll to execute {@link DefaultWorkflowExecutionRunnable} and {@link TaskExecuteRunnable}.
*/
private int execThreads = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.event;

import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;

public interface StateEventHandler {

Expand All @@ -30,7 +30,7 @@ public interface StateEventHandler {
* @throws StateEventHandleError this exception means it cannot be recovered, so the event need to drop.
* @throws StateEventHandleException this means it can be recovered.
*/
boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
boolean handleStateEvent(DefaultWorkflowExecutionRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleException, StateEventHandleError, StateEventHandleFailure;

StateEventType getEventType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
Expand All @@ -40,7 +40,7 @@
public class TaskCacheEventHandler implements TaskEventHandler {

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private IWorkflowExecuteRunnableRepository IWorkflowExecuteRunnableRepository;

@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
Expand All @@ -64,8 +64,9 @@ public void handleTaskEvent(TaskEvent taskEvent) {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
DefaultWorkflowExecutionRunnable workflowExecuteRunnable =
IWorkflowExecuteRunnableRepository.getByProcessInstanceId(
processInstanceId);
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionRunningEventAck;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;

import java.util.Optional;
Expand All @@ -43,7 +43,7 @@
public class TaskDelayEventHandler implements TaskEventHandler {

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private IWorkflowExecuteRunnableRepository IWorkflowExecuteRunnableRepository;

@Autowired
private TaskInstanceDao taskInstanceDao;
Expand All @@ -59,8 +59,8 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
DefaultWorkflowExecutionRunnable workflowExecuteRunnable =
this.IWorkflowExecuteRunnableRepository.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError("Cannot find related workflow instance from cache");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecutionRunnable;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -36,7 +36,7 @@
public class TaskDispatchEventHandler implements TaskEventHandler {

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private IWorkflowExecuteRunnableRepository IWorkflowExecuteRunnableRepository;

@Autowired
private TaskInstanceDao taskInstanceDao;
Expand All @@ -46,13 +46,13 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
IWorkflowExecutionRunnable workflowExecuteRunnable =
this.IWorkflowExecuteRunnableRepository.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
throw new TaskEventHandleError("Cannot find related workflow instance from cache");
}
TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId)
.orElseThrow(() -> new TaskEventHandleError("Cannot find related taskInstance from cache"));
TaskInstance taskInstance = workflowExecuteRunnable.getTaskExecutionRunnableById(taskInstanceId)
.getTaskExecutionRunnableContext().getTaskInstance();
if (taskInstance.getState() != TaskExecutionStatus.SUBMITTED_SUCCESS) {
log.warn(
"The current taskInstance status is not SUBMITTED_SUCCESS, so the dispatch event will be discarded, the current is a delay event, event: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionFinishEventAck;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;

import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -45,7 +43,7 @@
public class TaskResultEventHandler implements TaskEventHandler {

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private IWorkflowExecuteRunnableRepository IWorkflowExecuteRunnableRepository;

@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
Expand All @@ -67,20 +65,21 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, Ta
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
IWorkflowExecutionRunnable workflowExecuteRunnable =
this.IWorkflowExecuteRunnableRepository.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find related workflow instance from cache, will discard this event");
}
Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
if (!taskInstanceOptional.isPresent()) {
TaskInstance taskInstance = workflowExecuteRunnable.getTaskExecutionRunnableById(taskInstanceId)
.getTaskExecutionRunnableContext().getTaskInstance();
if (taskInstance == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
"Handle task result event error, cannot find the taskInstance from cache, will discord this event");
}
TaskInstance taskInstance = taskInstanceOptional.get();
if (taskInstance.getState().isFinished()) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;

import java.util.Map;

Expand All @@ -33,7 +33,7 @@
public class TaskRetryStateEventHandler implements StateEventHandler {

@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
public boolean handleStateEvent(DefaultWorkflowExecutionRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleException {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceExecutionEventAckListener;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceExecutionRunningEventAck;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.cache.IWorkflowExecuteRunnableRepository;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;

import java.util.Optional;
Expand All @@ -40,7 +40,7 @@
public class TaskRunningEventHandler implements TaskEventHandler {

@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private IWorkflowExecuteRunnableRepository IWorkflowExecuteRunnableRepository;

@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
Expand All @@ -56,8 +56,8 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
DefaultWorkflowExecutionRunnable workflowExecuteRunnable = null;
// this.IWorkflowExecuteRunnableRepository.getByProcessInstanceId(processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.DefaultWorkflowExecutionRunnable;

import java.util.Optional;
import java.util.Set;
Expand All @@ -34,7 +34,7 @@
public class TaskStateEventHandler implements StateEventHandler {

@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
public boolean handleStateEvent(DefaultWorkflowExecutionRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleException, StateEventHandleError {
TaskStateEvent taskStateEvent = (TaskStateEvent) stateEvent;
measureTaskState(taskStateEvent);
Expand Down
Loading

0 comments on commit 80446ae

Please sign in to comment.