Skip to content

Commit

Permalink
Merge branch 'dev-usable-fix-all' into fix-cant-detect-workgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
reele authored Nov 13, 2024
2 parents 4512385 + 64f2d3a commit 51e86a6
Show file tree
Hide file tree
Showing 18 changed files with 539 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public boolean isFinished() {
return isSuccess() || isFailure() || isStop() || isPause();
}

public boolean canFailover() {
return !isFinished() && !(this == SERIAL_WAIT || this == WAIT_TO_RUN);
}

/**
* status is success
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;

@RpcService
public interface ILogicTaskInstanceOperator {
Expand All @@ -38,4 +40,7 @@ public interface ILogicTaskInstanceOperator {
@RpcMethod
LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest);

@RpcMethod
LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.master.transportor;

import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogicTaskTakeOverRequest implements Serializable {

private static final long serialVersionUID = -1L;

private TaskExecutionContext taskExecutionContext;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.extract.master.transportor;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogicTaskTakeOverResponse {

private Integer taskInstanceId;

private boolean success;

private String message;

public static LogicTaskTakeOverResponse success(Integer taskInstanceId) {
return new LogicTaskTakeOverResponse(taskInstanceId, true, "take over success");
}

public static LogicTaskTakeOverResponse failed(Integer taskInstanceId, String message) {
return new LogicTaskTakeOverResponse(taskInstanceId, false, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.commons.collections4.CollectionUtils;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void startScheduleThread() {
}

public void subscribeWorkerGroupsChange(WorkerGroupListener listener) {

// add all group when listener added
listener.onWorkerGroupAdd(new ArrayList<>(workerGroupMap.values()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ protected void assembleWorkflowInstance(
throw new IllegalArgumentException(
"The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid");
}
workflowInstance.setCommandType(command.getCommandType());
workflowInstance.addHistoryCmd(command.getCommandType());
workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus());
workflowInstanceDao.updateById(workflowInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.engine.graph;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
Expand Down Expand Up @@ -252,7 +253,8 @@ public boolean isTaskExecutionRunnableSkipped(final ITaskExecutionRunnable taskE

@Override
public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable) {
return false;
return taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO;
// return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
Expand Down Expand Up @@ -157,29 +164,69 @@ private void initializeTaskExecutionContext() {
.createTaskExecutionContext(request);
}

private WorkflowInstance getValidSubWorkflowInstance() {

WorkflowInstanceRelation workflowInstanceRelation =
applicationContext.getBean(WorkflowInstanceRelationMapper.class)
.queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId());
if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) {
return null;
}

WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class)
.queryDetailById(workflowInstanceRelation.getWorkflowInstanceId());

if (workflowInstance == null || !workflowInstance.getState().canFailover()) {
return null;
}

return workflowInstance;
}

private boolean takeOverTaskFromExecutor() {
checkState(isTaskInstanceInitialized(), "The task instance is null, can't take over from executor.");
if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) {
return false;
}
if (StringUtils.isEmpty(taskInstance.getHost())) {
log.debug("Task: {} host is empty, cannot take over the task from executor(This is normal case).",
taskInstance.getName());
return false;
}
try {
final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder()
.taskInstanceId(taskInstance.getId())
.workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress())
.build();
final TakeOverTaskResponse takeOverTaskResponse = Clients
.withService(ITaskInstanceOperator.class)
.withHost(taskInstance.getHost())
.takeOverTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over task: {} failed", taskInstance.getName(), ex);
return false;

if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) {
if (!taskInstance.getTaskType().equals(SubWorkflowLogicTaskChannelFactory.NAME)) {
return false;
}
final WorkflowInstance subWorkflowInstance = getValidSubWorkflowInstance();
if (subWorkflowInstance == null) {
return false;
}
try {
final LogicTaskTakeOverRequest takeOverTaskRequest =
new LogicTaskTakeOverRequest(taskExecutionContext);

final LogicTaskTakeOverResponse takeOverTaskResponse = Clients
.withService(ILogicTaskInstanceOperator.class)
.withHost(taskInstance.getHost())
.takeOverLogicTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over logic task: {} failed", taskInstance.getName(), ex);
return false;
}
} else {
try {
final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder()
.taskInstanceId(taskInstance.getId())
.workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress())
.build();
final TakeOverTaskResponse takeOverTaskResponse = Clients
.withService(ITaskInstanceOperator.class)
.withHost(taskInstance.getHost())
.takeOverTask(takeOverTaskRequest);
return takeOverTaskResponse.isSuccess();
} catch (Exception ex) {
log.warn("Take over task: {} failed", taskInstance.getName(), ex);
return false;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.rpc;

import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class LogicITaskInstanceTakeOverOperationFunction
implements
ITaskInstanceOperationFunction<LogicTaskTakeOverRequest, LogicTaskTakeOverResponse> {

@Autowired
private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;

@Autowired
private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;

@Autowired
private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager;

@Override
public LogicTaskTakeOverResponse operate(LogicTaskTakeOverRequest taskTakeoverRequest) {
log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest);
TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext();
try {
final int taskInstanceId = taskExecutionContext.getTaskInstanceId();
final int workflowInstanceId = taskExecutionContext.getWorkflowInstanceId();
final String taskInstanceName = taskExecutionContext.getTaskName();

taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));

LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId);
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());

MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);

final MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
.createMasterTaskExecutor(taskExecutionContext);

if (masterTaskExecutorThreadPool.takeOverMasterTaskExecutor(masterTaskExecutor)) {
log.info("Take over LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName);
return LogicTaskTakeOverResponse.success(taskInstanceId);
} else {
log.error("Take over LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName);
return LogicTaskTakeOverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full");
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class LogicTaskInstanceOperationFunctionManager {
@Autowired
private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction;

@Autowired
private LogicITaskInstanceTakeOverOperationFunction logicITaskInstanceTakeOverOperationFunction;

public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() {
return logicITaskInstanceDispatchOperationFunction;
}
Expand All @@ -44,4 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati
return logicITaskInstancePauseOperationFunction;
}

public LogicITaskInstanceTakeOverOperationFunction getLogicTaskInstanceTakeOverOperationFunction() {
return logicITaskInstanceTakeOverOperationFunction;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -55,4 +57,10 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ
.operate(taskPauseRequest);
}

@Override
public LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest) {
return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeOverOperationFunction()
.operate(taskTakeOverRequest);
}

}
Loading

0 comments on commit 51e86a6

Please sign in to comment.