Skip to content

Commit

Permalink
Merge pull request #1115 from shinobi6xp/fixCopyFlowCannotPublishToDS
Browse files Browse the repository at this point in the history
fixbug:解决复制flow后发布到调度平台失败的问题
  • Loading branch information
zqburde authored Mar 18, 2024
2 parents c7305a8 + 916ca1f commit d32bbd9
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package com.webank.wedatasphere.dss.orchestrator.server.job;

import com.google.common.collect.Lists;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef;
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef;
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.common.utils.MapUtils;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion;
import com.webank.wedatasphere.dss.orchestrator.common.entity.*;
import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils;
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo;
import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService;
import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl;
import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation;
import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef;
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef;
Expand All @@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable {

protected OrchestratorCopyEnv orchestratorCopyEnv;

private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl;

private OrchestratorService orchestratorService;

private OrchestratorMapper orchestratorMapper;

private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString());


Expand Down Expand Up @@ -66,9 +77,10 @@ private void copyOrchestrator() {
newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName());
newOrchestrator.setUpdateTime(null);
newOrchestrator.setUpdateUser(null);
DSSOrchestratorVersion dssOrchestratorVersion = null;

try {
doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId);
} catch (Exception e) {
//保存错误信息
Expand All @@ -91,9 +103,37 @@ private void copyOrchestrator() {
orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All"));
orchestratorCopyInfo.setStatus(1);
orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo);

List<DSSLabel> dssLabels = new ArrayList<>();
dssLabels.add(orchestratorCopyVo.getDssLabel());

//2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求
OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(),
orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
OrchestrationService::getOrchestrationCreationOperation,
(structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation)
.createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create");

try {
orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(),
orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels);
} catch (Exception e) {
throw new RuntimeException("error happened when copying orc.", e);
}

Long orchestratorId = newOrchestrator.getId();
Long orchestratorVersionId = dssOrchestratorVersion.getId();
//4.将工程和orchestrator的关系存储到的数据库中
if (orchestrationResponseRef != null) {
Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId");
orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId()));
} else {
LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId);

}
}

private void doOrchestratorCopy(String userName,
private DSSOrchestratorVersion doOrchestratorCopy(String userName,
Workspace workspace,
DSSOrchestratorInfo dssOrchestratorInfo,
String projectName,
Expand Down Expand Up @@ -132,6 +172,7 @@ private void doOrchestratorCopy(String userName,
orchestratorCopyEnv.getOrchestratorMapper().addOrchestrator(dssOrchestratorInfo);
dssOrchestratorVersion.setOrchestratorId(dssOrchestratorInfo.getId());
orchestratorCopyEnv.getOrchestratorMapper().addOrchestratorVersion(dssOrchestratorVersion);
return dssOrchestratorVersion;
}


Expand All @@ -158,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() {
public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) {
this.orchestratorCopyInfo = orchestratorCopyInfo;
}

public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() {
return orchestratorFrameworkServiceImpl;
}

public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) {
this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl;
}

public OrchestratorService getOrchestratorService() {
return orchestratorService;
}

public void setOrchestratorService(OrchestratorService orchestratorService) {
this.orchestratorService = orchestratorService;
}

public OrchestratorMapper getOrchestratorMapper() {
return orchestratorMapper;
}

public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) {
this.orchestratorMapper = orchestratorMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ void deleteOrchestrator(String userName,
String projectName,
Long orchestratorInfoId,
List<DSSLabel> dssLabels) throws Exception;
/**
* 复制编排
*
*/
OrchestratorVo copyOrchestrator(String userName,
Workspace workspace,
String projectName,
Long projectId,
String description,
DSSOrchestratorInfo dssOrchestratorInfo,
List<DSSLabel> dssLabels) throws Exception;


/**
* 解锁编排对应的工作流
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea
return commonOrchestratorVo;
}

private <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
public <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
Workspace workspace, DSSOrchestratorInfo dssOrchestrator,
Function<OrchestrationService, StructureOperation> getOrchestrationOperation,
BiFunction<StructureOperation, K, V> responseRefConsumer, String operationName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,47 @@ public void deleteOrchestrator(String userName,
orchestratorMapper.deleteOrchestrator(orchestratorInfoId);
}

@Override
@Transactional(rollbackFor = Exception.class)
public OrchestratorVo copyOrchestrator(String userName,
Workspace workspace,
String projectName,
Long projectId,
String description,
DSSOrchestratorInfo dssOrchestratorInfo,
List<DSSLabel> dssLabels) throws Exception {
OrchestratorVo orchestratorVo = new OrchestratorVo();
//todo 增加校验
String uuid = UUID.randomUUID().toString();

//作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。
dssOrchestratorInfo.setUUID(uuid);

String version = OrchestratorUtils.generateNewVersion();
String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName);
LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName());
//1. 访问DSS工作流微模块创建工作流
RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null,
developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(),
dssContextRequestRef -> dssContextRequestRef.setContextId(contextId),
projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId),
(developmentOperation, developmentRequestRef) -> {
DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName,
workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels);
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
.map(Object::toString).orElse("NULL")).build();
DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef;
requestRef.setDSSJobContent(dssJobContent);
return ((RefCreationOperation) developmentOperation).createRef(requestRef);
}, "create");

return orchestratorVo;
}

@Override
public OrchestratorUnlockVo unlockOrchestrator(String userName,
Workspace workspace,
Expand Down

0 comments on commit d32bbd9

Please sign in to comment.