Skip to content

Commit

Permalink
Merge branch 'dev' into fix-workflow-instance-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
qingwli authored Jan 17, 2024
2 parents 2b0834c + ace0860 commit 4a780f3
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -44,20 +45,25 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import com.fasterxml.jackson.databind.node.NullNode;

@Slf4j
public class DinkyTask extends AbstractRemoteTask {

private final TaskExecutionContext taskExecutionContext;

private DinkyParameters dinkyParameters;
private String jobInstanceId;
private boolean status;
private String dinkyVersion;

protected DinkyTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
Expand All @@ -79,73 +85,192 @@ public void init() {
}
}

// todo split handle to submit and track
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// Get dinky version
dinkyVersion = getDinkyVersion(this.dinkyParameters.getAddress());
super.handle(taskCallBack);
}

@Override
public void submitApplication() throws TaskException {
if (dinkyVersion.startsWith("0")) {
submitApplicationV0();
} else {
submitApplicationV1();
}
}

@Override
public void trackApplicationStatus() throws TaskException {
if (dinkyVersion.startsWith("0")) {
trackApplicationStatusV0();
} else {
trackApplicationStatusV1();
}
}

private void submitApplicationV0() {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
if (isOnline) {
// Online dinky task, and only one job is allowed to execute
result = onlineTask(address, taskId);
// Online dinky-0.6.5 task, and only one job is allowed to execute
result = onlineTaskV0(address, taskId);
} else {
// Submit dinky task
result = submitTask(address, taskId);
// Submit dinky-0.6.5 task
result = submitTaskV0(address, taskId);
}
if (checkResult(result)) {
boolean status = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("success").asBoolean();
String jobInstanceId = result.get(DinkyTaskConstants.API_RESULT_DATAS).get("jobInstanceId").asText();
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResult(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format("%s-%s", address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
result.get(DinkyTaskConstants.API_RESULT_DATAS));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(DinkyTaskConstants.API_RESULT_DATAS).get("error")
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
if (checkResultV0(result)) {
status = result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
if (result.get(apiResultDatasKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
&& !(result.get(apiResultDatasKey)
.get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
jobInstanceId =
result.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
}
}
} catch (InterruptedException ex) {
} catch (Exception ex) {
Thread.currentThread().interrupt();
log.error("Execute dinkyTask failed", ex);
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute dinkyTask failed", ex);
throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
}
}

@Override
public void submitApplication() throws TaskException {
private void submitApplicationV1() {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
boolean isOnline = this.dinkyParameters.isOnline();
JsonNode result;
String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
// Submit dinky-1.0.0 task
result = submitTaskV1(address, taskId, isOnline, generateVariables());
if (checkResultV1(result)) {
status = result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean();
if (result.get(apiResultDataKey).has(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID)
&& !(result.get(apiResultDataKey)
.get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID) instanceof NullNode)) {
jobInstanceId =
result.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_JOB_INSTANCE_ID).asText();
}
} else {
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG + "{}", result.get(DinkyTaskConstants.API_RESULT_MSG));
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(
DinkyTaskConstants.SUBMIT_FAILED_MSG + result.get(DinkyTaskConstants.API_RESULT_MSG));
}
} catch (Exception ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.SUBMIT_FAILED_MSG, ex);
}
}

public void trackApplicationStatusV0() throws TaskException {
try {
String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
if (status && jobInstanceId == null) {
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(mapStatusToExitCode(true));
log.info("Dinky common sql task finished.");
return;
}
String apiResultDatasKey = DinkyTaskConstants.API_RESULT_DATAS;
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResultV0(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(apiResultDatasKey).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
jobInstanceInfoResult.get(apiResultDatasKey));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(
jobInstanceInfoResult.get(apiResultDatasKey).get(DinkyTaskConstants.API_RESULT_ERROR)
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
}
}

@Override
public void trackApplicationStatus() throws TaskException {
public void trackApplicationStatusV1() throws TaskException {
try {

String address = this.dinkyParameters.getAddress();
String taskId = this.dinkyParameters.getTaskId();
if (status && jobInstanceId == null) {
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(mapStatusToExitCode(true));
log.info("Dinky common sql task finished.");
return;
}
String apiResultDataKey = DinkyTaskConstants.API_RESULT_DATA;
boolean finishFlag = false;
while (!finishFlag) {
JsonNode jobInstanceInfoResult = getJobInstanceInfo(address, jobInstanceId);
if (!checkResultV1(jobInstanceInfoResult)) {
break;
}
String jobInstanceStatus =
jobInstanceInfoResult.get(apiResultDataKey).get("status").asText();
switch (jobInstanceStatus) {
case DinkyTaskConstants.STATUS_FINISHED:
final int exitStatusCode = mapStatusToExitCode(status);
// Use address-taskId as app id
setAppIds(String.format(DinkyTaskConstants.APPIDS_FORMAT, address, taskId));
setExitStatusCode(exitStatusCode);
log.info("dinky task finished with results: {}",
jobInstanceInfoResult.get(apiResultDataKey));
finishFlag = true;
break;
case DinkyTaskConstants.STATUS_FAILED:
case DinkyTaskConstants.STATUS_CANCELED:
case DinkyTaskConstants.STATUS_UNKNOWN:
errorHandle(jobInstanceInfoResult.get(apiResultDataKey).get(DinkyTaskConstants.API_RESULT_ERROR)
.asText());
finishFlag = true;
break;
default:
Thread.sleep(DinkyTaskConstants.SLEEP_MILLIS);
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.error(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException(DinkyTaskConstants.TRACK_FAILED_MSG, ex);
}
}

/**
* map dinky task status to exitStatusCode
*
Expand All @@ -160,15 +285,28 @@ private int mapStatusToExitCode(boolean status) {
}
}

private boolean checkResult(JsonNode result) {
if (result instanceof MissingNode || result == null) {
private boolean checkResultV0(JsonNode result) {
boolean isCorrect = true;
if (result instanceof MissingNode || result instanceof NullNode) {
errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
isCorrect = false;
} else if (result.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
isCorrect = false;
}
return isCorrect;
}

private boolean checkResultV1(JsonNode result) {
boolean isCorrect = true;
if (result instanceof MissingNode || result instanceof NullNode) {
errorHandle(DinkyTaskConstants.API_VERSION_ERROR_TIPS);
return false;
} else if (result.get("code").asInt() == DinkyTaskConstants.API_ERROR) {
errorHandle(result.get("msg"));
return false;
isCorrect = false;
} else if (!result.get(DinkyTaskConstants.API_RESULT_SUCCESS).asBoolean()) {
errorHandle(result.get(DinkyTaskConstants.API_RESULT_MSG));
isCorrect = false;
}
return true;
return isCorrect;
}

private void errorHandle(Object msg) {
Expand Down Expand Up @@ -196,18 +334,53 @@ public void cancelApplication() throws TaskException {
taskId);
}

private JsonNode submitTask(String address, String taskId) {
private Map<String, String> generateVariables() {
Map<String, String> variables = new ConcurrentHashMap<>();
List<Property> propertyList = JSONUtils.toList(taskExecutionContext.getGlobalParams(), Property.class);
if (propertyList != null && !propertyList.isEmpty()) {
for (Property property : propertyList) {
variables.put(property.getProp(), property.getValue());
}
}
List<Property> localParams = this.dinkyParameters.getLocalParams();
if (localParams == null || localParams.isEmpty()) {
return variables;
}
for (Property property : localParams) {
variables.put(property.getProp(), property.getValue());
}
return variables;
}

private String getDinkyVersion(String address) {
JsonNode versionJsonNode = parse(doGet(address + DinkyTaskConstants.GET_VERSION, new HashMap<>()));
if (versionJsonNode instanceof MissingNode || versionJsonNode == null
|| versionJsonNode.get(DinkyTaskConstants.API_RESULT_CODE).asInt() == DinkyTaskConstants.API_ERROR) {
return "0";
}
return versionJsonNode.get(DinkyTaskConstants.API_RESULT_DATA).asText();
}

private JsonNode submitTaskV0(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.SUBMIT_TASK, params));
}

private JsonNode onlineTask(String address, String taskId) {
private JsonNode onlineTaskV0(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
return parse(doGet(address + DinkyTaskConstants.ONLINE_TASK, params));
}

private JsonNode submitTaskV1(String address, String taskId, boolean isOnline, Map<String, String> variables) {
Map<String, Object> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_TASK_ID, taskId);
params.put(DinkyTaskConstants.PARAM_TASK_IS_ONLINE, isOnline);
params.put(DinkyTaskConstants.PARAM_TASK_VARIABLES, variables);
return parse(sendJsonStr(address + DinkyTaskConstants.SUBMIT_TASK, JSONUtils.toJsonString(params)));
}

private JsonNode cancelTask(String address, String taskId) {
Map<String, String> params = new HashMap<>();
params.put(DinkyTaskConstants.PARAM_JSON_TASK_ID, taskId);
Expand Down Expand Up @@ -289,4 +462,5 @@ private String sendJsonStr(String url, String params) {
}
return result;
}

}
Loading

0 comments on commit 4a780f3

Please sign in to comment.