diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 32feec747b..9684c84064 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -24,10 +24,7 @@ import org.apache.streampark.common.enums.ResolveOrder; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.util.YarnUtils; -import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.JacksonUtils; -import org.apache.streampark.console.core.metrics.flink.Overview; import org.apache.streampark.console.core.utils.YarnQueueLabelExpression; import org.apache.commons.lang3.StringUtils; @@ -156,51 +153,6 @@ public URI getRemoteURI() { return null; } - /** - * Verify the cluster connection whether is valid. - * - * @return false if the connection of the cluster is invalid, true else. - */ - public boolean verifyClusterConnection() { - if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) { - if (address == null) { - return false; - } - // 1) check url is Legal - if (!CommonUtils.isLegalUrl(address)) { - return false; - } - // 2) check connection - try { - String restUrl = address + "/overview"; - String result = - HttpClientUtils.httpGetRequest( - restUrl, - RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); - JacksonUtils.read(result, Overview.class); - return true; - } catch (Exception ignored) { - // - } - return false; - } - if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) { - try { - String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview"; - String result = - HttpClientUtils.httpGetRequest( - restUrl, - RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build()); - JacksonUtils.read(result, Overview.class); - return true; - } catch (Exception ignored) { - // - } - return false; - } - return false; - } - @JsonIgnore public Map getFlinkConfig() throws JsonProcessingException { String restUrl = this.address + "/jobmanager/config"; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 8867567708..7c9140dc1c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -78,6 +78,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.service.VariableService; import org.apache.streampark.console.core.service.YarnQueueService; +import org.apache.streampark.console.core.task.FlinkClusterWatcher; import org.apache.streampark.console.core.task.FlinkHttpWatcher; import org.apache.streampark.flink.client.FlinkClient; import org.apache.streampark.flink.client.bean.CancelRequest; @@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends ServiceImplfalse if the connection of the cluster is invalid, true else. + */ + public Boolean verifyClusterConnection(FlinkCluster flinkCluster) { + ClusterState clusterState = httpClusterState(flinkCluster); + return ClusterState.isRunning(clusterState) ? true : false; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java index f6ed5c96c5..26302387ad 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java @@ -17,7 +17,6 @@ package org.apache.streampark.console.core.task; -import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.common.util.HttpClientUtils; import org.apache.streampark.common.util.ThreadUtils; @@ -784,8 +783,7 @@ private void doAlert(Application app, FlinkAppState appState) { case YARN_SESSION: case REMOTE: FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId()); - ClusterState clusterState = flinkClusterWatcher.getClusterState(flinkCluster); - if (ClusterState.isRunning(clusterState)) { + if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) { log.info( "application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert", app.getId(),