Skip to content

Commit

Permalink
Merge branch 'dev' into fix-#15268
Browse files Browse the repository at this point in the history
  • Loading branch information
fuchanghai authored Dec 5, 2023
2 parents 1f6aa65 + 43f5f24 commit 1f82a7b
Show file tree
Hide file tree
Showing 28 changed files with 268 additions and 396 deletions.
2 changes: 2 additions & 0 deletions docs/docs/en/guide/resource/task-group.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The task group is mainly used to control the concurrency of task instances, and is designed to control the pressure of other resources (it can also control the pressure of the Hadoop cluster, the cluster will have queue control it). When creating a new task definition, you can configure the corresponding task group and configure the priority of the task running in the task group. The user can only view the task groups belongs to authorized projects, and can create or update task groups belongs to one project only if they have write permission.

> Note: The task group's resource restrictions are on the project level and unrelated to tenants.
## Task Group Configuration

### Create Task Group
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/en/guide/security/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Administrator login, default username/password: admin/dolphinscheduler123
- The queue is used when executing programs such as spark and mapreduce, and the "queue" parameter needs to be used.
- The administrator enters the `Security Center->Queue Management` page and clicks the "Create Queue" button to create a new queue.

> Note: Currently, only admin users can modify queues.
![create-queue](../../../../img/new_ui/dev/security/create-queue.png)

## Add Tenant
Expand All @@ -17,6 +19,8 @@ Administrator login, default username/password: admin/dolphinscheduler123
- Tenant Code: **The tenant code is the user on Linux, unique and cannot be repeated**
- The administrator enters the `Security Center->Tenant Management` page, and clicks the `Create Tenant` button to create a tenant.

> Note: Currently, only admin users can modify tenant.
![create-tenant](../../../../img/new_ui/dev/security/create-tenant.png)

## Create Normal User
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/zh/guide/resource/task-group.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

任务组主要用于控制任务实例并发,旨在控制其他资源的压力(也可以控制 Hadoop 集群压力,不过集群会有队列管控)。您可在新建任务定义时,可配置对应的任务组,并配置任务在任务组内运行的优先级。用户仅能查看有权限的项目对应的任务组,且仅能创建或修改具有写权限的项目对应的任务组。

> 注意:任务组的对资源的限制是在项目级别的,和租户没有关系
### 任务组配置

#### 新建任务组
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/zh/guide/security/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- 队列是在执行 spark、mapreduce 等程序,需要用到“队列”参数时使用的。
- 管理员进入安全中心 -> 队列管理页面,点击“创建队列”按钮,创建队列。

> 注意:目前仅有 admin 用户可以修改队列。
![create-queue](../../../../img/new_ui/dev/security/create-queue.png)

## 添加租户
Expand All @@ -16,6 +18,8 @@
- 租户编码:**租户编码是 Linux上 的用户,唯一,不能重复**
- 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。

> 注意:目前仅有 admin 用户可以修改租户。
![create-tenant](../../../../img/new_ui/dev/security/create-tenant.png)

## 创建普通用户
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
Expand All @@ -37,25 +35,15 @@
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -108,8 +96,7 @@ public Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skip
log.error("Host of task instance is null, taskInstanceId:{}.", taskInstId);
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId);
projectService.checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG);
projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG);
Result<ResponseTaskLog> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance, skipLineNum, limit);
int lineNum = log.split("\\r\\n").length;
Expand Down Expand Up @@ -199,7 +186,6 @@ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
final String logPath = taskInstance.getLogPath();
final String host = taskInstance.getHost();
log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}",
taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath);
StringBuilder sb = new StringBuilder();
Expand All @@ -211,48 +197,24 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
sb.append(head);
}

String logContent = null;
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogPageQueryRequest logicTaskInstanceLogPageQueryRequest =
new LogicTaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
LogicTaskInstanceLogPageQueryResponse logicTaskInstanceLogPageQueryResponse =
masterLogService.pageQueryLogicTaskInstanceLog(logicTaskInstanceLogPageQueryRequest);
logContent = logicTaskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest =
new TaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
TaskInstanceLogPageQueryResponse taskInstanceLogPageQueryResponse =
iWorkerLogService.pageQueryTaskInstanceLog(taskInstanceLogPageQueryRequest);
logContent = taskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest.builder()
.taskInstanceId(taskInstance.getId())
.taskInstanceLogAbsolutePath(logPath)
.skipLineNum(skipLineNum)
.limit(limit)
.build();
TaskInstanceLogPageQueryResponse response = iLogService.pageQueryTaskInstanceLog(request);
String logContent = response.getLogContent();
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
} catch (Throwable ex) {
throw new ServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, ex);
}
if (logContent == null && RemoteLogUtils.isRemoteLoggingEnable()) {
// When getting the log for the first time (skipLineNum=0) returns empty, get the log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
List<String> lines = LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit);
logContent = LogUtils.rollViewLogLines(lines);
FileUtils.delete(new File(logPath));
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
}
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
}

/**
Expand All @@ -271,45 +233,19 @@ private byte[] getLogBytes(TaskInstance taskInstance) {
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);

byte[] logBytes = new byte[0];
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest =
new LogicTaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
LogicTaskInstanceLogFileDownloadResponse logicTaskInstanceLogFileDownloadResponse =
masterLogService.getLogicTaskInstanceWholeLogFileBytes(logicTaskInstanceLogFileDownloadRequest);
logBytes = logicTaskInstanceLogFileDownloadResponse.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse taskInstanceWholeLogFileBytes =
iWorkerLogService.getTaskInstanceWholeLogFileBytes(taskInstanceLogFileDownloadRequest);
logBytes = taskInstanceWholeLogFileBytes.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
}

if ((logBytes == null || logBytes.length == 0) && RemoteLogUtils.isRemoteLoggingEnable()) {
// get task log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
File logFile = new File(logPath);
logBytes = FileUtils.readFileToByteArray(logFile);
FileUtils.delete(logFile);
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogFileDownloadRequest request =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse response = iLogService.getTaskInstanceWholeLogFileBytes(request);
logBytes = response.getLogBytes();
return Bytes.concat(head, logBytes);
} catch (Exception ex) {
log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex);
throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR);
}

return Bytes.concat(head, logBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -381,18 +379,9 @@ public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
return;
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
} else {
IWorkerLogService workerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IWorkerLogService.class);
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}

dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
Expand Down
Loading

0 comments on commit 1f82a7b

Please sign in to comment.