Skip to content

Commit

Permalink
Add dolphinscheduler-extract-common module (#15266)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Dec 5, 2023
1 parent a6e30fd commit 43f5f24
Show file tree
Hide file tree
Showing 24 changed files with 256 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
Expand All @@ -37,25 +35,15 @@
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -108,8 +96,7 @@ public Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skip
log.error("Host of task instance is null, taskInstanceId:{}.", taskInstId);
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId);
projectService.checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG);
projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG);
Result<ResponseTaskLog> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance, skipLineNum, limit);
int lineNum = log.split("\\r\\n").length;
Expand Down Expand Up @@ -199,7 +186,6 @@ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
final String logPath = taskInstance.getLogPath();
final String host = taskInstance.getHost();
log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}",
taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath);
StringBuilder sb = new StringBuilder();
Expand All @@ -211,48 +197,24 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
sb.append(head);
}

String logContent = null;
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogPageQueryRequest logicTaskInstanceLogPageQueryRequest =
new LogicTaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
LogicTaskInstanceLogPageQueryResponse logicTaskInstanceLogPageQueryResponse =
masterLogService.pageQueryLogicTaskInstanceLog(logicTaskInstanceLogPageQueryRequest);
logContent = logicTaskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest =
new TaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
TaskInstanceLogPageQueryResponse taskInstanceLogPageQueryResponse =
iWorkerLogService.pageQueryTaskInstanceLog(taskInstanceLogPageQueryRequest);
logContent = taskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest.builder()
.taskInstanceId(taskInstance.getId())
.taskInstanceLogAbsolutePath(logPath)
.skipLineNum(skipLineNum)
.limit(limit)
.build();
TaskInstanceLogPageQueryResponse response = iLogService.pageQueryTaskInstanceLog(request);
String logContent = response.getLogContent();
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
} catch (Throwable ex) {
throw new ServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, ex);
}
if (logContent == null && RemoteLogUtils.isRemoteLoggingEnable()) {
// When getting the log for the first time (skipLineNum=0) returns empty, get the log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
List<String> lines = LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit);
logContent = LogUtils.rollViewLogLines(lines);
FileUtils.delete(new File(logPath));
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
}
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
}

/**
Expand All @@ -271,45 +233,19 @@ private byte[] getLogBytes(TaskInstance taskInstance) {
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);

byte[] logBytes = new byte[0];
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest =
new LogicTaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
LogicTaskInstanceLogFileDownloadResponse logicTaskInstanceLogFileDownloadResponse =
masterLogService.getLogicTaskInstanceWholeLogFileBytes(logicTaskInstanceLogFileDownloadRequest);
logBytes = logicTaskInstanceLogFileDownloadResponse.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse taskInstanceWholeLogFileBytes =
iWorkerLogService.getTaskInstanceWholeLogFileBytes(taskInstanceLogFileDownloadRequest);
logBytes = taskInstanceWholeLogFileBytes.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
}

if ((logBytes == null || logBytes.length == 0) && RemoteLogUtils.isRemoteLoggingEnable()) {
// get task log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
File logFile = new File(logPath);
logBytes = FileUtils.readFileToByteArray(logFile);
FileUtils.delete(logFile);
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogFileDownloadRequest request =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse response = iLogService.getTaskInstanceWholeLogFileBytes(request);
logBytes = response.getLogBytes();
return Bytes.concat(head, logBytes);
} catch (Exception ex) {
log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex);
throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR);
}

return Bytes.concat(head, logBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -381,18 +379,9 @@ public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
return;
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
} else {
IWorkerLogService workerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IWorkerLogService.class);
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}

dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
Expand Down
Loading

0 comments on commit 43f5f24

Please sign in to comment.