diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 32feec747b..9684c84064 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -24,10 +24,7 @@
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.PropertiesUtils;
-import org.apache.streampark.common.util.YarnUtils;
-import org.apache.streampark.console.base.util.CommonUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
-import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.utils.YarnQueueLabelExpression;
import org.apache.commons.lang3.StringUtils;
@@ -156,51 +153,6 @@ public URI getRemoteURI() {
return null;
}
- /**
- * Verify the cluster connection whether is valid.
- *
- * @return false
if the connection of the cluster is invalid, true
else.
- */
- public boolean verifyClusterConnection() {
- if (ExecutionMode.REMOTE.equals(this.getExecutionModeEnum())) {
- if (address == null) {
- return false;
- }
- // 1) check url is Legal
- if (!CommonUtils.isLegalUrl(address)) {
- return false;
- }
- // 2) check connection
- try {
- String restUrl = address + "/overview";
- String result =
- HttpClientUtils.httpGetRequest(
- restUrl,
- RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
- JacksonUtils.read(result, Overview.class);
- return true;
- } catch (Exception ignored) {
- //
- }
- return false;
- }
- if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
- try {
- String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview";
- String result =
- HttpClientUtils.httpGetRequest(
- restUrl,
- RequestConfig.custom().setConnectTimeout(2000, TimeUnit.MILLISECONDS).build());
- JacksonUtils.read(result, Overview.class);
- return true;
- } catch (Exception ignored) {
- //
- }
- return false;
- }
- return false;
- }
-
@JsonIgnore
public Map getFlinkConfig() throws JsonProcessingException {
String restUrl = this.address + "/jobmanager/config";
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8867567708..7c9140dc1c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -78,6 +78,7 @@
import org.apache.streampark.console.core.service.SettingService;
import org.apache.streampark.console.core.service.VariableService;
import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.FlinkClusterWatcher;
import org.apache.streampark.console.core.task.FlinkHttpWatcher;
import org.apache.streampark.flink.client.FlinkClient;
import org.apache.streampark.flink.client.bean.CancelRequest;
@@ -216,6 +217,8 @@ public class ApplicationServiceImpl extends ServiceImplfalse if the connection of the cluster is invalid, true
else.
+ */
+ public Boolean verifyClusterConnection(FlinkCluster flinkCluster) {
+ ClusterState clusterState = httpClusterState(flinkCluster);
+ return ClusterState.isRunning(clusterState) ? true : false;
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
index f6ed5c96c5..26302387ad 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkHttpWatcher.java
@@ -17,7 +17,6 @@
package org.apache.streampark.console.core.task;
-import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.util.HttpClientUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -784,8 +783,7 @@ private void doAlert(Application app, FlinkAppState appState) {
case YARN_SESSION:
case REMOTE:
FlinkCluster flinkCluster = flinkClusterService.getById(app.getFlinkClusterId());
- ClusterState clusterState = flinkClusterWatcher.getClusterState(flinkCluster);
- if (ClusterState.isRunning(clusterState)) {
+ if (flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
log.info(
"application with id {} is yarn session or remote and flink cluster with id {} is alive, application send alert",
app.getId(),