diff --git a/datasophon-service/src/main/java/com/datasophon/api/utils/ProcessUtils.java b/datasophon-service/src/main/java/com/datasophon/api/utils/ProcessUtils.java index 8c383dff..4381fcfa 100644 --- a/datasophon-service/src/main/java/com/datasophon/api/utils/ProcessUtils.java +++ b/datasophon-service/src/main/java/com/datasophon/api/utils/ProcessUtils.java @@ -143,9 +143,7 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) { SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceWebuisService.class); ClusterServiceInstanceRoleGroupService roleGroupService = SpringTool.getApplicationContext().getBean(ClusterServiceInstanceRoleGroupService.class); - ClusterInfoEntity clusterInfo = clusterInfoService.getById(serviceRoleInfo.getClusterId()); - ClusterServiceInstanceEntity clusterServiceInstance = serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(serviceRoleInfo.getClusterId(), serviceRoleInfo.getParentName()); @@ -177,7 +175,6 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) { } Integer roleGroupId = (Integer) CacheUtils.get("UseRoleGroup_" + clusterServiceInstance.getId()); ClusterServiceInstanceRoleGroup roleGroup = roleGroupService.getById(roleGroupId); - // save role instance ClusterServiceRoleInstanceEntity roleInstanceEntity = serviceRoleInstanceService .getOneServiceRole(serviceRoleInfo.getName(), serviceRoleInfo.getHostname(), clusterInfo.getId()); @@ -222,20 +219,15 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) { webuisService.save(webuis); globalVariables.remove("${host}"); } - } } - } - public static void saveHostInstallInfo(StartWorkerMessage message, String clusterCode, ClusterHostService clusterHostService) { ClusterInfoService clusterInfoService = SpringTool.getApplicationContext().getBean(ClusterInfoService.class); ClusterHostDO clusterHostDO = new ClusterHostDO(); BeanUtil.copyProperties(message, clusterHostDO); - ClusterInfoEntity cluster = clusterInfoService.getClusterByClusterCode(clusterCode); - clusterHostDO.setClusterId(cluster.getId()); clusterHostDO.setCheckTime(new Date()); clusterHostDO.setRack("/default-rack"); @@ -246,7 +238,6 @@ public static void saveHostInstallInfo(StartWorkerMessage message, String cluste clusterHostDO.setManaged(MANAGED.YES); clusterHostService.save(clusterHostDO); } - public static void updateCommandStateToFailed(List commandIds) { for (String commandId : commandIds) { logger.info("command id is {}", commandId); @@ -283,12 +274,10 @@ public static void updateCommandStateToFailed(List commandIds) { } } } - public static void tellCommandActorResult(String serviceName, ExecuteServiceRoleCommand executeServiceRoleCommand, ServiceExecuteState state) { ActorRef serviceExecuteResultActor = ActorUtils.getLocalActor(ServiceExecuteResultActor.class, ActorUtils.getActorRefName(ServiceExecuteResultActor.class)); - ServiceExecuteResultMessage serviceExecuteResultMessage = new ServiceExecuteResultMessage(); serviceExecuteResultMessage.setServiceExecuteState(state); serviceExecuteResultMessage.setDag(executeServiceRoleCommand.getDag()); @@ -302,15 +291,12 @@ public static void tellCommandActorResult(String serviceName, ExecuteServiceRole serviceExecuteResultMessage.setErrorTaskList(executeServiceRoleCommand.getErrorTaskList()); serviceExecuteResultMessage.setReadyToSubmitTaskList(executeServiceRoleCommand.getReadyToSubmitTaskList()); serviceExecuteResultMessage.setCompleteTaskList(executeServiceRoleCommand.getCompleteTaskList()); - serviceExecuteResultActor.tell(serviceExecuteResultMessage, ActorRef.noSender()); } - public static ClusterServiceCommandHostCommandEntity handleCommandResult(String hostCommandId, Boolean execResult, String execOut) { ClusterServiceCommandHostCommandService service = SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class); - ClusterServiceCommandHostCommandEntity hostCommand = service.getByHostCommandId(hostCommandId); hostCommand.setCommandProgress(100); if (execResult) { @@ -335,14 +321,12 @@ public static ClusterServiceCommandHostCommandEntity handleCommandResult(String } else { message.setServiceRoleType(ServiceRoleType.WORKER); } - ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor"); ActorUtils.actorSystem.scheduler().scheduleOnce(FiniteDuration.apply( 1L, TimeUnit.SECONDS), commandActor, message, ActorUtils.actorSystem.dispatcher(), ActorRef.noSender()); - return hostCommand; } @@ -469,17 +453,36 @@ public static void hdfsEcMethond(Integer serviceInstanceId, ClusterServiceRoleIn .eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstanceId) .eq(ClusterServiceRoleInstanceEntity::getServiceRoleName, roleName) .list(); - - // 更新namenode节点的whitelist白名单 + ClusterVariableService variableService = + SpringTool.getApplicationContext().getBean(ClusterVariableService.class); + // 更新namenode节点的whitelist白名单或blacklist黑名单,分别是dfs.hosts、dfs.hosts.exclude 对应的配置文件 for (ClusterServiceRoleInstanceEntity namenode : namenodes) { ActorSelection actorSelection = ActorUtils.actorSystem.actorSelection( "akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/fileOperateActor"); ActorSelection execCmdActor = ActorUtils.actorSystem.actorSelection( "akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/executeCmdActor"); Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS)); + // DataNode 节点的 hostname 作为文件内容 FileOperateCommand fileOperateCommand = new FileOperateCommand(); fileOperateCommand.setLines(list); - fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type); + // fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type); + if ("whitelist".equals(type)) { + // 优先获取通过页面白名单文件变量配置的路径,而不是写死文件路径 + ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts}", namenode.getClusterId()); + if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) { + logger.info("dfs.hosts value is {}", clusterVariable.getVariableValue()); + fileOperateCommand.setPath(clusterVariable.getVariableValue()); + } else { + fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type); + } + } else { + ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts.exclude}", namenode.getClusterId()); + if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) { + fileOperateCommand.setPath(clusterVariable.getVariableValue()); + } else { + fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type); + } + } Future future = Patterns.ask(actorSelection, fileOperateCommand, timeout); ExecResult fileOperateResult = (ExecResult) Await.result(future, timeout.duration()); if (Objects.nonNull(fileOperateResult) && fileOperateResult.getExecResult()) { diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/WorkerApplicationServer.java b/datasophon-worker/src/main/java/com/datasophon/worker/WorkerApplicationServer.java index f8c06bff..9ff4bcb3 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/WorkerApplicationServer.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/WorkerApplicationServer.java @@ -106,6 +106,7 @@ public static void main(String[] args) throws UnknownHostException { } private static void initUserMap(Map userMap) { + userMap.put("hadoop", HADOOP); userMap.put("hdfs", HADOOP); userMap.put("yarn", HADOOP); userMap.put("hive", HADOOP); diff --git a/datasophon-worker/src/main/resources/templates/xml.ftl b/datasophon-worker/src/main/resources/templates/xml.ftl index 91773ace..438432d0 100644 --- a/datasophon-worker/src/main/resources/templates/xml.ftl +++ b/datasophon-worker/src/main/resources/templates/xml.ftl @@ -4,7 +4,15 @@ <#list itemList as item> ${item.name} +<#if item.value?contains("\\n")> + +<#list item.value?split("\\n") as line> + ${line} + + +<#else> ${item.value} + \ No newline at end of file