Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][service] 通过页面dfs.hosts配置的值创建白名单文件 (#622) #623

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceWebuisService.class);
ClusterServiceInstanceRoleGroupService roleGroupService =
SpringTool.getApplicationContext().getBean(ClusterServiceInstanceRoleGroupService.class);

ClusterInfoEntity clusterInfo = clusterInfoService.getById(serviceRoleInfo.getClusterId());

ClusterServiceInstanceEntity clusterServiceInstance =
serviceInstanceService.getServiceInstanceByClusterIdAndServiceName(serviceRoleInfo.getClusterId(),
serviceRoleInfo.getParentName());
Expand Down Expand Up @@ -177,7 +175,6 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
}
Integer roleGroupId = (Integer) CacheUtils.get("UseRoleGroup_" + clusterServiceInstance.getId());
ClusterServiceInstanceRoleGroup roleGroup = roleGroupService.getById(roleGroupId);

// save role instance
ClusterServiceRoleInstanceEntity roleInstanceEntity = serviceRoleInstanceService
.getOneServiceRole(serviceRoleInfo.getName(), serviceRoleInfo.getHostname(), clusterInfo.getId());
Expand Down Expand Up @@ -222,20 +219,15 @@ public static void saveServiceInstallInfo(ServiceRoleInfo serviceRoleInfo) {
webuisService.save(webuis);
globalVariables.remove("${host}");
}

}
}

}

public static void saveHostInstallInfo(StartWorkerMessage message, String clusterCode,
ClusterHostService clusterHostService) {
ClusterInfoService clusterInfoService = SpringTool.getApplicationContext().getBean(ClusterInfoService.class);
ClusterHostDO clusterHostDO = new ClusterHostDO();
BeanUtil.copyProperties(message, clusterHostDO);

ClusterInfoEntity cluster = clusterInfoService.getClusterByClusterCode(clusterCode);

clusterHostDO.setClusterId(cluster.getId());
clusterHostDO.setCheckTime(new Date());
clusterHostDO.setRack("/default-rack");
Expand All @@ -246,7 +238,6 @@ public static void saveHostInstallInfo(StartWorkerMessage message, String cluste
clusterHostDO.setManaged(MANAGED.YES);
clusterHostService.save(clusterHostDO);
}

public static void updateCommandStateToFailed(List<String> commandIds) {
for (String commandId : commandIds) {
logger.info("command id is {}", commandId);
Expand Down Expand Up @@ -283,12 +274,10 @@ public static void updateCommandStateToFailed(List<String> commandIds) {
}
}
}

public static void tellCommandActorResult(String serviceName, ExecuteServiceRoleCommand executeServiceRoleCommand,
ServiceExecuteState state) {
ActorRef serviceExecuteResultActor = ActorUtils.getLocalActor(ServiceExecuteResultActor.class,
ActorUtils.getActorRefName(ServiceExecuteResultActor.class));

ServiceExecuteResultMessage serviceExecuteResultMessage = new ServiceExecuteResultMessage();
serviceExecuteResultMessage.setServiceExecuteState(state);
serviceExecuteResultMessage.setDag(executeServiceRoleCommand.getDag());
Expand All @@ -302,15 +291,12 @@ public static void tellCommandActorResult(String serviceName, ExecuteServiceRole
serviceExecuteResultMessage.setErrorTaskList(executeServiceRoleCommand.getErrorTaskList());
serviceExecuteResultMessage.setReadyToSubmitTaskList(executeServiceRoleCommand.getReadyToSubmitTaskList());
serviceExecuteResultMessage.setCompleteTaskList(executeServiceRoleCommand.getCompleteTaskList());

serviceExecuteResultActor.tell(serviceExecuteResultMessage, ActorRef.noSender());
}

public static ClusterServiceCommandHostCommandEntity handleCommandResult(String hostCommandId, Boolean execResult,
String execOut) {
ClusterServiceCommandHostCommandService service =
SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class);

ClusterServiceCommandHostCommandEntity hostCommand = service.getByHostCommandId(hostCommandId);
hostCommand.setCommandProgress(100);
if (execResult) {
Expand All @@ -335,14 +321,12 @@ public static ClusterServiceCommandHostCommandEntity handleCommandResult(String
} else {
message.setServiceRoleType(ServiceRoleType.WORKER);
}

ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor");
ActorUtils.actorSystem.scheduler().scheduleOnce(FiniteDuration.apply(
1L, TimeUnit.SECONDS),
commandActor, message,
ActorUtils.actorSystem.dispatcher(),
ActorRef.noSender());

return hostCommand;
}

Expand Down Expand Up @@ -469,17 +453,36 @@ public static void hdfsEcMethond(Integer serviceInstanceId, ClusterServiceRoleIn
.eq(ClusterServiceRoleInstanceEntity::getServiceId, serviceInstanceId)
.eq(ClusterServiceRoleInstanceEntity::getServiceRoleName, roleName)
.list();

// 更新namenode节点的whitelist白名单
ClusterVariableService variableService =
SpringTool.getApplicationContext().getBean(ClusterVariableService.class);
// 更新namenode节点的whitelist白名单或blacklist黑名单,分别是dfs.hosts、dfs.hosts.exclude 对应的配置文件
for (ClusterServiceRoleInstanceEntity namenode : namenodes) {
ActorSelection actorSelection = ActorUtils.actorSystem.actorSelection(
"akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/fileOperateActor");
ActorSelection execCmdActor = ActorUtils.actorSystem.actorSelection(
"akka.tcp://datasophon@" + namenode.getHostname() + ":2552/user/worker/executeCmdActor");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
// DataNode 节点的 hostname 作为文件内容
FileOperateCommand fileOperateCommand = new FileOperateCommand();
fileOperateCommand.setLines(list);
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
// fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
if ("whitelist".equals(type)) {
// 优先获取通过页面白名单文件变量配置的路径,而不是写死文件路径
ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts}", namenode.getClusterId());
if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) {
logger.info("dfs.hosts value is {}", clusterVariable.getVariableValue());
fileOperateCommand.setPath(clusterVariable.getVariableValue());
} else {
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
}
} else {
ClusterVariable clusterVariable = variableService.getVariableByVariableName("${dfs.hosts.exclude}", namenode.getClusterId());
if (!Objects.isNull(clusterVariable) && !StringUtils.isBlank(clusterVariable.getVariableValue())) {
fileOperateCommand.setPath(clusterVariable.getVariableValue());
} else {
fileOperateCommand.setPath(Constants.INSTALL_PATH + "/hadoop/etc/hadoop/" + type);
}
}
Future<Object> future = Patterns.ask(actorSelection, fileOperateCommand, timeout);
ExecResult fileOperateResult = (ExecResult) Await.result(future, timeout.duration());
if (Objects.nonNull(fileOperateResult) && fileOperateResult.getExecResult()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public static void main(String[] args) throws UnknownHostException {
}

private static void initUserMap(Map<String, String> userMap) {
userMap.put("hadoop", HADOOP);
userMap.put("hdfs", HADOOP);
userMap.put("yarn", HADOOP);
userMap.put("hive", HADOOP);
Expand Down
8 changes: 8 additions & 0 deletions datasophon-worker/src/main/resources/templates/xml.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@
<#list itemList as item>
<property>
<name>${item.name}</name>
<#if item.value?contains("\\n")>
<value>
<#list item.value?split("\\n") as line>
${line}
</#list>
</value>
<#else>
<value>${item.value}</value>
</#if>
</property>
</#list>
</configuration>