Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-2902][improvement] improvement flink cluster verifyClusterConnection #2903

Merged
merged 1 commit into from
Jul 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -156,51 +153,6 @@ public URI getRemoteURI() {
return null;
}

/**
* Verify the cluster connection whether is valid.
*
* @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else.
*/
public boolean verifyClusterConnection() {
if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
if (address == null) {
return false;
}
// 1) check url is Legal
if (!CommonUtils.isLegalUrl(address)) {
return false;
}
// 2) check connection
try {
String restUrl = address + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception ignored) {
//
}
return false;
}
if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(result, Overview.class);
return true;
} catch (Exception ignored) {
//
}
return false;
}
return false;
}

@JsonIgnore
public Map<String, String> getFlinkConfig() throws JsonProcessingException {
String restUrl = this.address + "/jobmanager/config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
Expand Down Expand Up @@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli

@Autowired private YarnQueueService yarnQueueService;

@Autowired private FlinkClusterWatcher flinkClusterWatcher;

@PostConstruct
public void resetOptionState() {
this.baseMapper.resetOptionState();
Expand Down Expand Up @@ -430,7 +433,7 @@ public boolean checkEnv(Application appParam) throws ApplicationException {
if (ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())
|| ExecutionMode.REMOTE.equals(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
boolean conned = flinkCluster.verifyClusterConnection();
boolean conned = flinkClusterWatcher.verifyClusterConnection(flinkCluster);
if (!conned) {
throw new ApiAlertException("the target cluster is unavailable, please check!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli

@Autowired private YarnQueueService yarnQueueService;

@Autowired private FlinkClusterWatcher flinkClusterWatcher;

@Override
public ResponseResult check(FlinkCluster cluster) {
ResponseResult result = new ResponseResult();
Expand All @@ -118,14 +120,14 @@ public ResponseResult check(FlinkCluster cluster) {

// 3) Check connection
if (ExecutionMode.isRemoteMode(cluster.getExecutionModeEnum())
&& !cluster.verifyClusterConnection()) {
&& !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The remote cluster connection failed, please check!");
result.setStatus(3);
return result;
}
if (ExecutionMode.isYarnMode(cluster.getExecutionModeEnum())
&& cluster.getClusterId() != null
&& !cluster.verifyClusterConnection()) {
&& !flinkClusterWatcher.verifyClusterConnection(cluster)) {
result.setMsg("The flink cluster connection failed, please check!");
result.setStatus(4);
return result;
Expand Down Expand Up @@ -411,7 +413,7 @@ private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
ApiAlertException.throwIfFalse(
ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkCluster.verifyClusterConnection()) {
if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
flinkCluster.setClusterState(ClusterState.LOST.getValue());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please check!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,7 @@ public ClusterState getClusterState(FlinkCluster flinkCluster) {
if (state != null) {
return state;
}
switch (flinkCluster.getExecutionModeEnum()) {
case REMOTE:
state = httpRemoteClusterState(flinkCluster);
break;
case YARN_SESSION:
state = httpYarnSessionClusterState(flinkCluster);
break;
default:
state = ClusterState.UNKNOWN;
break;
}
state = httpClusterState(flinkCluster);
if (ClusterState.isRunning(state)) {
FAILED_STATES.invalidate(flinkCluster.getId());
} else {
Expand Down Expand Up @@ -192,6 +182,23 @@ private ClusterState httpYarnSessionClusterState(FlinkCluster flinkCluster) {
return state;
}

/**
* get flink cluster state
*
* @param flinkCluster
* @return
*/
private ClusterState httpClusterState(FlinkCluster flinkCluster) {
switch (flinkCluster.getExecutionModeEnum()) {
case REMOTE:
return httpRemoteClusterState(flinkCluster);
case YARN_SESSION:
return httpYarnSessionClusterState(flinkCluster);
default:
return ClusterState.UNKNOWN;
}
}

/**
* cluster get state from flink rest api
*
Expand Down Expand Up @@ -272,9 +279,18 @@ public static void unWatching(FlinkCluster flinkCluster) {
* @return
*/
private ClusterState yarnStateConvertClusterState(YarnApplicationState state) {
if (state == YarnApplicationState.FINISHED) {
return ClusterState.CANCELED;
}
return ClusterState.of(state.toString());
return state == YarnApplicationState.FINISHED
? ClusterState.CANCELED
: ClusterState.of(state.toString());
}

/**
* Verify the cluster connection whether is valid.
*
* @return <code>false</code> if the connection of the cluster is invalid, <code>true</code> else.
*/
public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
ClusterState clusterState = httpClusterState(flinkCluster);
return ClusterState.isRunning(clusterState) ? true : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.streampark.console.core.task;

import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
Expand Down Expand Up @@ -784,8 +783,7 @@ private void doAlert(Application app, FlinkAppState appState) {
case YARN_SESSION:
case REMOTE:
FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId());
ClusterState clusterState = flinkClusterWatcher.getClusterState(flinkCluster);
if (ClusterState.isRunning(clusterState)) {
if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
log.info(
"application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert",
app.getId(),
Expand Down
Loading