Skip to content

Commit

Permalink
Merge branch 'dev' into start-param-pass
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiajie authored Feb 1, 2024
2 parents ab48eb7 + 8974233 commit a45d24d
Show file tree
Hide file tree
Showing 58 changed files with 1,467 additions and 927 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ jobs:
fail-fast: false
matrix:
db: ["mysql", "postgresql"]
version: ["2.0.9", "3.0.6", "3.1.8", "3.2.0"]
version: ["2.0.9", "3.0.6", "3.1.9", "3.2.0"]
steps:
- name: Set up JDK 8
uses: actions/setup-java@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-XX:+PrintGCDetails
-Xloggc:gc.log

-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=dump.hprof

Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-api/src/main/bin/jvm_args_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
-XX:+PrintGCDetails
-Xloggc:gc.log

-XX:+ExitOnOutOfMemoryError
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=dump.hprof

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Result queryNamespaceListPaging(@Parameter(hidden = true) @RequestAttribu
}

/**
* create namespace,if not exist on k8s,will create,if exist only register in db
* register namespace in db,need to create namespace in k8s first
*
* @param loginUser
* @param namespace k8s namespace
Expand All @@ -111,7 +111,7 @@ public Result createNamespace(@Parameter(hidden = true) @RequestAttribute(value
@RequestParam(value = "namespace") String namespace,
@RequestParam(value = "clusterCode") Long clusterCode) {
Map<String, Object> result =
k8sNamespaceService.createK8sNamespace(loginUser, namespace, clusterCode);
k8sNamespaceService.registerK8sNamespace(loginUser, namespace, clusterCode);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
* @param pageSize page size
* @return queue list
*/
@Operation(summary = "queryTasksByGroupId", description = "QUERY_ALL_TASKS_GROUP_NOTES")
@Operation(summary = "queryTaskGroupQueuesByGroupId", description = "QUERY_TASKS_GROUP_GROUP_QUEUES")
@Parameters({
@Parameter(name = "groupId", description = "GROUP_ID", required = false, schema = @Schema(implementation = int.class, example = "1", defaultValue = "-1")),
@Parameter(name = "taskInstanceName", description = "TASK_INSTANCE_NAME", required = false, schema = @Schema(implementation = String.class, example = "taskName")),
Expand All @@ -310,15 +310,21 @@ public Result modifyPriority(@Parameter(hidden = true) @RequestAttribute(value =
@GetMapping(value = "/query-list-by-group-id")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_QUEUE_LIST_ERROR)
public Result queryTasksByGroupId(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName, processName, status,
groupId, pageNo, pageSize);
public Result queryTaskGroupQueues(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "groupId", required = false, defaultValue = "-1") Integer groupId,
@RequestParam(value = "taskInstanceName", required = false) String taskName,
@RequestParam(value = "processInstanceName", required = false) String processName,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(
loginUser,
taskName,
processName,
status,
groupId,
pageNo,
pageSize);
return returnDataList(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ public interface K8sNamespaceService {
Result queryListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize);

/**
* create namespace,if not exist on k8s,will create,if exist only register in db
* register namespace in db,need to create namespace in k8s first
*
* @param loginUser login user
* @param namespace namespace
* @param clusterCode k8s not null
* @return
*/
Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode);
Map<String, Object> registerK8sNamespace(User loginUser, String namespace, Long clusterCode);

/**
* verify namespace and k8s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ Map<String, Object> createTaskGroup(User loginUser, Long projectCode, String nam
Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
String description, int groupSize);

/**
* get task group status
*
* @param id task group id
* @return the result code and msg
*/
boolean isTheTaskGroupAvailable(int id);

/**
* query all task group by user id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
Expand Down Expand Up @@ -83,14 +82,12 @@
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.master.IStreamingTaskOperator;
import org.apache.dolphinscheduler.extract.master.ITaskInstanceExecutionEventListener;
import org.apache.dolphinscheduler.extract.master.IWorkflowInstanceService;
import org.apache.dolphinscheduler.extract.master.dto.WorkflowExecuteDto;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceForceStartRequest;
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.service.command.CommandService;
Expand Down Expand Up @@ -552,16 +549,20 @@ public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
Map<String, Object> result = new HashMap<>();
TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.selectById(queueId);
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
log.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.",
taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
ProcessInstance processInstance = processInstanceDao.queryOptionalById(taskGroupQueue.getProcessId())
.orElseThrow(
() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId()));
checkMasterExists();

if (taskGroupQueue.getInQueue() == Flag.NO.getCode()) {
throw new ServiceException(Status.TASK_GROUP_QUEUE_ALREADY_START);
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
taskGroupQueueMapper.updateById(taskGroupQueue);

checkMasterExists();
return forceStart(processInstance, taskGroupQueue);
result.put(Constants.STATUS, Status.SUCCESS);
return result;
}

public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
Expand Down Expand Up @@ -664,32 +665,6 @@ private Map<String, Object> updateProcessInstancePrepare(ProcessInstance process
return result;
}

/**
* prepare to update process instance command type and status
*
* @param processInstance process instance
* @return update result
*/
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
log.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}

taskGroupQueue.setForceStart(Flag.YES.getCode());
taskGroupQueue.setUpdateTime(new Date());
processService.updateTaskGroupQueue(taskGroupQueue);
log.info("Sending force start command to master: {}.", processInstance.getHost());
ILogicTaskInstanceOperator iLogicTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class);
iLogicTaskInstanceOperator.forceStartTaskInstance(
new TaskInstanceForceStartRequest(processInstance.getId(), taskGroupQueue.getTaskId()));
putMsg(result, Status.SUCCESS);
return result;
}

/**
* check whether sub processes are offline before starting process definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ public Result queryListPaging(User loginUser, String searchVal, Integer pageNo,
}

/**
* create namespace,if not exist on k8s,will create,if exist only register in db
* register namespace in db,need to create namespace in k8s first
*
* @param loginUser login user
* @param namespace namespace
* @param clusterCode k8s not null
* @return
*/
@Override
public Map<String, Object> createK8sNamespace(User loginUser, String namespace, Long clusterCode) {
public Map<String, Object> registerK8sNamespace(User loginUser, String namespace, Long clusterCode) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser)) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName, String processName, Integer status,
int groupId, int pageNo, int pageSize) {
public Map<String, Object> queryTasksByGroupId(User loginUser,
String taskName,
String processName,
Integer status,
int groupId,
int pageNo,
int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
Expand All @@ -79,8 +84,13 @@ public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName,
return result;
}
List<Project> projects = projectMapper.selectBatchIds(projectIds);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page,
taskName, processName, status, groupId, projects);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(
page,
taskName,
processName,
status,
groupId,
projects);

pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Map<String, Object> createTaskGroup(User loginUser, Long projectCode, Str
.description(description)
.groupSize(groupSize)
.userId(loginUser.getId())
.status(Flag.YES.getCode())
.status(Flag.YES)
.createTime(now)
.updateTime(now)
.build();
Expand Down Expand Up @@ -180,7 +180,7 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
if (taskGroup.getStatus() != Flag.YES.getCode()) {
if (taskGroup.getStatus() != Flag.YES) {
log.warn("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
Expand All @@ -202,17 +202,6 @@ public Map<String, Object> updateTaskGroup(User loginUser, int id, String name,
return result;
}

/**
* get task group status
*
* @param id task group id
* @return is the task group available
*/
@Override
public boolean isTheTaskGroupAvailable(int id) {
return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1;
}

/**
* query all task group by user id
*
Expand Down Expand Up @@ -331,12 +320,12 @@ public Map<String, Object> closeTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.NO.getCode()) {
if (taskGroup.getStatus() == Flag.NO) {
log.info("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_CLOSED);
return result;
}
taskGroup.setStatus(Flag.NO.getCode());
taskGroup.setStatus(Flag.NO);
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
log.info("Task group close complete, taskGroupId:{}.", id);
Expand Down Expand Up @@ -364,12 +353,12 @@ public Map<String, Object> startTaskGroup(User loginUser, int id) {
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.YES.getCode()) {
if (taskGroup.getStatus() == Flag.YES) {
log.info("Task group has been started, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_OPENED);
return result;
}
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
if (update > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ public void queryListPaging() {
public void createK8sNamespace() {
// namespace is null
Map<String, Object> result =
k8sNamespaceService.createK8sNamespace(getLoginUser(), null, clusterCode);
k8sNamespaceService.registerK8sNamespace(getLoginUser(), null, clusterCode);
logger.info(result.toString());
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// k8s is null
result = k8sNamespaceService.createK8sNamespace(getLoginUser(), namespace, null);
result = k8sNamespaceService.registerK8sNamespace(getLoginUser(), namespace, null);
logger.info(result.toString());
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// correct
Mockito.when(clusterMapper.queryByClusterCode(Mockito.anyLong())).thenReturn(getCluster());
result = k8sNamespaceService.createK8sNamespace(getLoginUser(), namespace, clusterCode);
result = k8sNamespaceService.registerK8sNamespace(getLoginUser(), namespace, clusterCode);
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private TaskGroup getTaskGroup() {
.description(taskGroupDesc)
.groupSize(100)
.userId(1)
.status(Flag.YES.getCode())
.status(Flag.YES)
.build();

return taskGroup;
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testUpdate() {

User loginUser = getLoginUser();
TaskGroup taskGroup = getTaskGroup();
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setStatus(Flag.YES);
// Task group status error

Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TASK_GROUP,
Expand All @@ -218,7 +218,6 @@ public void testUpdate() {
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
}

@Test
Expand All @@ -236,12 +235,12 @@ public void testCloseAndStart() {
Map<String, Object> result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));

taskGroup.setStatus(0);
taskGroup.setStatus(Flag.NO);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.closeTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_CLOSED, result.get(Constants.STATUS));

taskGroup.setStatus(1);
taskGroup.setStatus(Flag.YES);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
result = taskGroupService.startTaskGroup(loginUser, 1);
Assertions.assertEquals(Status.TASK_GROUP_STATUS_OPENED, result.get(Constants.STATUS));
Expand Down
Loading

0 comments on commit a45d24d

Please sign in to comment.