From 52ed6865575272a174e22440763fdf64b8e736e5 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Tue, 29 Oct 2024 19:33:04 +0800
Subject: [PATCH 1/9] zookeeper registry
---
pom.xml | 7 +
.../src/main/assembly/conf/config.yaml | 4 +
.../console/StreamParkConsoleBootstrap.java | 16 ++
.../console/core/service/RegistryService.java | 28 +++
.../service/impl/RegistryServiceImpl.java | 203 ++++++++++++++++++
.../core/service/RegistryServiceTest.java | 36 ++++
6 files changed, 294 insertions(+)
create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
diff --git a/pom.xml b/pom.xml
index 0c9fdbc156..4a65812bae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
3.3.4
2.1.10
3.3.0
+ 3.6.3
6.2.3
2.17
2.5.0
@@ -242,6 +243,12 @@
test
+
+ org.apache.zookeeper
+ zookeeper
+ ${zoopkeeper.version}
+
+
redis.clients
jedis
diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
index 084e991121..a12277c625 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
+++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
@@ -106,6 +106,10 @@ registry:
heartbeat-refresh-interval: 1s
session-timeout: 3s
+zookeeper:
+ # zookeeper address
+ address: localhost:2181
+
network:
# network interface preferred like eth0, default: empty
preferred-interface: ""
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 601fc7f00f..fda4c7d827 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -18,12 +18,16 @@
package org.apache.streampark.console;
import org.apache.streampark.console.base.config.SpringProperties;
+import org.apache.streampark.console.core.service.RegistryService;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
+import javax.annotation.PostConstruct;
+
/**
*
*
@@ -48,6 +52,9 @@
@EnableScheduling
public class StreamParkConsoleBootstrap {
+ @Autowired
+ private RegistryService registryService;
+
public static void main(String[] args) throws Exception {
new SpringApplicationBuilder()
.properties(SpringProperties.get())
@@ -55,4 +62,13 @@ public static void main(String[] args) throws Exception {
.run(args);
}
+ @PostConstruct
+ public void init() {
+ registryService.start();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ registryService.close();
+ log.info("RegistryService close success.");
+ }));
+ }
+
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
new file mode 100644
index 0000000000..41e8b3dd18
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+public interface RegistryService {
+
+ void start();
+
+ /**
+ * Close the registry service.
+ */
+ void close();
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
new file mode 100644
index 0000000000..196216960c
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.core.service.DistributedTaskService;
+import org.apache.streampark.console.core.service.RegistryService;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+@Slf4j
+@Service
+public class RegistryServiceImpl implements RegistryService {
+
+ private static final String REGISTRY_PATH = "/services";
+ private static final int HEARTBEAT_INTERVAL = 10000;
+ private static final int HEARTBEAT_TIMEOUT = 60000;
+
+ private String zk_address;
+ private ZooKeeper zk;
+ private String nodePath;
+ private Watcher watcher = event -> {
+ if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
+ && event.getPath().equals(REGISTRY_PATH)) {
+ handleNodeChanges();
+ }
+ };
+
+ @Getter
+ private Set currentNodes = new HashSet<>();
+
+ @Autowired
+ private DistributedTaskService distributedTaskService;
+
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
+
+ @PostConstruct
+ public void init() {
+ try {
+ zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181");
+ zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);
+
+ if (zk.exists(REGISTRY_PATH, false) == null) {
+ zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ String ip = InetAddress.getLocalHost().getHostAddress();
+ String port = SystemPropertyUtils.get("server.port", "10000");
+ nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0],
+ OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ currentNodes.add(nodePath);
+ } catch (Exception e) {
+ log.error("Failed to init ZooKeeper client", e);
+ }
+ }
+
+ @Override
+ public void start() {
+ try {
+ distributedTaskService.init(currentNodes, nodePath);
+ startHeartbeat();
+ startHeartbeatChecker();
+ handleNodeChanges();
+ } catch (Exception e) {
+ log.error("Failed to start ZooKeeper client", e);
+ }
+ }
+
+ private void startHeartbeat() {
+ scheduler.scheduleAtFixedRate(() -> {
+ try {
+ zk.setData(nodePath, new byte[0], -1);
+ log.info("Heartbeat updated for node: {}", nodePath);
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ log.error("Failed to update heartbeat for node: {}", nodePath, e);
+ }
+ }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ private void startHeartbeatChecker() {
+ scheduler.scheduleAtFixedRate(() -> {
+ try {
+ long now = System.currentTimeMillis();
+ List servers = zk.getChildren(REGISTRY_PATH, false);
+ for (String server : servers) {
+ String serverPath = REGISTRY_PATH + "/" + server;
+ Stat stat = zk.exists(serverPath, false);
+ if (stat != null && (now - stat.getMtime() > HEARTBEAT_TIMEOUT)) {
+ zk.delete(serverPath, -1);
+ log.info("Deleted stale node: {}", serverPath);
+ }
+ }
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ log.error("Failed to check heartbeat", e);
+ }
+ }, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ private synchronized void handleNodeChanges() {
+ try {
+ List nodes = zk.getChildren(REGISTRY_PATH, true);
+ Set newNodes = new HashSet<>(nodes);
+
+ for (String node : newNodes) {
+ if (!currentNodes.contains(node)) {
+ log.info("Node added: {}", node);
+ distributedTaskService.addServer(node);
+ }
+ }
+
+ for (String node : currentNodes) {
+ if (!newNodes.contains(node)) {
+ log.info("Node removed: {}", node);
+ distributedTaskService.removeServer(node);
+ }
+ }
+
+ currentNodes = newNodes;
+ log.info("Online servers: {}", currentNodes);
+ } catch (KeeperException e) {
+ log.info("Zookeeper session expired, attempting to reconnect...");
+ reconnectAndRegister();
+ } catch (InterruptedException e) {
+ log.error("Failed to handle node changes", e);
+ }
+ }
+
+ private void reconnectAndRegister() {
+ int retries = 5;
+ while (retries > 0) {
+ try {
+ zk.close();
+ zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);
+ zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return;
+ } catch (Exception e) {
+ retries--;
+ log.warn("Retrying connection, attempts left: {}", retries, e);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException ignored) {
+
+ }
+ }
+ }
+ log.error("Failed to reconnect and register node after multiple attempts.");
+ }
+
+ @Override
+ public void close() {
+ try {
+ zk.close();
+ scheduler.shutdown();
+ log.info("ZooKeeper client closed: {}", nodePath);
+ } catch (InterruptedException e) {
+ log.error("Failed to close ZooKeeper client", e);
+ }
+ }
+
+}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
new file mode 100644
index 0000000000..756c0911db
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RegistryServiceTest {
+
+ private final RegistryServiceImpl registryService = new RegistryServiceImpl();
+
+ @Test
+ public void testRegister() {
+ registryService.init();
+ Assertions.assertEquals(1, registryService.getCurrentNodes().size());
+ registryService.close();
+ }
+
+}
From 7cea308b4743664fb3c7ec32f467f568468a81f1 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Tue, 29 Oct 2024 19:39:04 +0800
Subject: [PATCH 2/9] add registry interface comment
---
.../streampark/console/core/service/RegistryService.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
index 41e8b3dd18..1e362ad1bb 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -19,6 +19,9 @@
public interface RegistryService {
+ /**
+ * Start the registry service.
+ */
void start();
/**
From 0ebcf9192226eb0eb4dd9b4b4add6dda054dab88 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Tue, 29 Oct 2024 19:46:49 +0800
Subject: [PATCH 3/9] handle InterruptedException
---
.../console/core/service/impl/RegistryServiceImpl.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
index 196216960c..3fbc182345 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -112,6 +112,7 @@ private void startHeartbeat() {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
log.error("Failed to update heartbeat for node: {}", nodePath, e);
}
}, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
@@ -134,6 +135,7 @@ private void startHeartbeatChecker() {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
log.error("Failed to check heartbeat", e);
}
}, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -164,6 +166,7 @@ private synchronized void handleNodeChanges() {
log.info("Zookeeper session expired, attempting to reconnect...");
reconnectAndRegister();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
log.error("Failed to handle node changes", e);
}
}
@@ -182,7 +185,7 @@ private void reconnectAndRegister() {
try {
Thread.sleep(3000);
} catch (InterruptedException ignored) {
-
+ Thread.currentThread().interrupt();
}
}
}
@@ -196,6 +199,7 @@ public void close() {
scheduler.shutdown();
log.info("ZooKeeper client closed: {}", nodePath);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
log.error("Failed to close ZooKeeper client", e);
}
}
From 0296878cc49b21eb321a8172bfc3ffa181d52d2b Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Tue, 29 Oct 2024 19:57:47 +0800
Subject: [PATCH 4/9] fix dependencies
---
tools/dependencies/known-dependencies.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index e685de5ad8..43696a93cf 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -377,5 +377,5 @@ xml-apis-1.4.01.jar
xnio-api-3.8.7.Final.jar
xnio-nio-3.8.7.Final.jar
xz-1.5.jar
-zookeeper-3.4.14.jar
+zookeeper-3.6.3.jar
icu4j-67.1.jar
From a7beaa5118294b36f692c822195494b7b3c19550 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Tue, 29 Oct 2024 22:16:57 +0800
Subject: [PATCH 5/9] fix comments
---
.../src/main/assembly/conf/config.yaml | 16 ++++++-------
.../console/StreamParkConsoleBootstrap.java | 17 +++++++++----
.../console/core/service/RegistryService.java | 4 ++--
.../service/impl/RegistryServiceImpl.java | 24 +++++++++++++++----
.../core/service/RegistryServiceTest.java | 6 +++--
5 files changed, 44 insertions(+), 23 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
index a12277c625..91e843a2b9 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
+++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml
@@ -100,15 +100,13 @@ sso:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO
-registry:
- # default using jdbc as registry
- type: jdbc
- heartbeat-refresh-interval: 1s
- session-timeout: 3s
-
-zookeeper:
- # zookeeper address
- address: localhost:2181
+high-availability:
+ enable: false # true
+ # The list of ZooKeeper quorum peers that coordinate the high-availability
+ # setup. This must be a list of the form:
+ # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
+ #
+ zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181
network:
# network interface preferred like eth0, default: empty
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index fda4c7d827..1d3492b681 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.config.SpringProperties;
import org.apache.streampark.console.core.service.RegistryService;
@@ -64,11 +65,17 @@ public static void main(String[] args) throws Exception {
@PostConstruct
public void init() {
- registryService.start();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- registryService.close();
- log.info("RegistryService close success.");
- }));
+ if (enableHA()) {
+ registryService.startListening();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ registryService.deRegistry();
+ log.info("RegistryService close success.");
+ }));
+ }
+ }
+
+ public boolean enableHA() {
+ return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
index 1e362ad1bb..d6a0cfb196 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -22,10 +22,10 @@ public interface RegistryService {
/**
* Start the registry service.
*/
- void start();
+ void startListening();
/**
* Close the registry service.
*/
- void close();
+ void deRegistry();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
index 3fbc182345..cab5afadf5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -71,9 +71,12 @@ public class RegistryServiceImpl implements RegistryService {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
@PostConstruct
- public void init() {
+ public void registry() {
+ if (!enableHA()) {
+ return;
+ }
try {
- zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181");
+ zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);
if (zk.exists(REGISTRY_PATH, false) == null) {
@@ -82,7 +85,8 @@ public void init() {
String ip = InetAddress.getLocalHost().getHostAddress();
String port = SystemPropertyUtils.get("server.port", "10000");
- nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0],
+ String server_id = ip + ":" + port;
+ nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0],
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
currentNodes.add(nodePath);
@@ -92,7 +96,10 @@ public void init() {
}
@Override
- public void start() {
+ public void startListening() {
+ if (!enableHA()) {
+ return;
+ }
try {
distributedTaskService.init(currentNodes, nodePath);
startHeartbeat();
@@ -193,7 +200,10 @@ private void reconnectAndRegister() {
}
@Override
- public void close() {
+ public void deRegistry() {
+ if (!enableHA()) {
+ return;
+ }
try {
zk.close();
scheduler.shutdown();
@@ -204,4 +214,8 @@ public void close() {
}
}
+ public boolean enableHA() {
+ return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
+ }
+
}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
index 756c0911db..e701b2a614 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;
import org.junit.jupiter.api.Assertions;
@@ -28,9 +29,10 @@ public class RegistryServiceTest {
@Test
public void testRegister() {
- registryService.init();
+ SystemPropertyUtils.set("high-availability.enable", "true");
+ registryService.registry();
Assertions.assertEquals(1, registryService.getCurrentNodes().size());
- registryService.close();
+ registryService.deRegistry();
}
}
From 07af79995d6c9982f04854a09a826623a58b72e4 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Wed, 30 Oct 2024 11:35:09 +0800
Subject: [PATCH 6/9] fix commets
---
.../console/StreamParkConsoleBootstrap.java | 5 ++--
.../console/core/service/RegistryService.java | 9 ++++--
.../service/impl/RegistryServiceImpl.java | 29 ++++---------------
.../core/service/RegistryServiceTest.java | 13 ++++++---
4 files changed, 25 insertions(+), 31 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 1d3492b681..70ba2fc973 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -66,9 +66,10 @@ public static void main(String[] args) throws Exception {
@PostConstruct
public void init() {
if (enableHA()) {
- registryService.startListening();
+ registryService.registry();
+ registryService.doRegister();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- registryService.deRegistry();
+ registryService.unRegister();
log.info("RegistryService close success.");
}));
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
index d6a0cfb196..0b3fc4a0a8 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -19,13 +19,18 @@
public interface RegistryService {
+ /**
+ * Registry the service.
+ */
+ void registry();
+
/**
* Start the registry service.
*/
- void startListening();
+ void doRegister();
/**
* Close the registry service.
*/
- void deRegistry();
+ void unRegister();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
index cab5afadf5..543cba6fc6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -32,8 +32,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
-
import java.net.InetAddress;
import java.util.HashSet;
import java.util.List;
@@ -52,7 +50,7 @@ public class RegistryServiceImpl implements RegistryService {
private static final int HEARTBEAT_INTERVAL = 10000;
private static final int HEARTBEAT_TIMEOUT = 60000;
- private String zk_address;
+ private String zkAddress;
private ZooKeeper zk;
private String nodePath;
private Watcher watcher = event -> {
@@ -70,14 +68,10 @@ public class RegistryServiceImpl implements RegistryService {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
- @PostConstruct
public void registry() {
- if (!enableHA()) {
- return;
- }
try {
- zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
- zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);
+ zkAddress = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
+ zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
if (zk.exists(REGISTRY_PATH, false) == null) {
zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -96,10 +90,7 @@ public void registry() {
}
@Override
- public void startListening() {
- if (!enableHA()) {
- return;
- }
+ public void doRegister() {
try {
distributedTaskService.init(currentNodes, nodePath);
startHeartbeat();
@@ -183,7 +174,7 @@ private void reconnectAndRegister() {
while (retries > 0) {
try {
zk.close();
- zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);
+ zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher);
zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return;
} catch (Exception e) {
@@ -200,10 +191,7 @@ private void reconnectAndRegister() {
}
@Override
- public void deRegistry() {
- if (!enableHA()) {
- return;
- }
+ public void unRegister() {
try {
zk.close();
scheduler.shutdown();
@@ -213,9 +201,4 @@ public void deRegistry() {
log.error("Failed to close ZooKeeper client", e);
}
}
-
- public boolean enableHA() {
- return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
- }
-
}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
index e701b2a614..e624eb5a6f 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -29,10 +29,15 @@ public class RegistryServiceTest {
@Test
public void testRegister() {
- SystemPropertyUtils.set("high-availability.enable", "true");
- registryService.registry();
- Assertions.assertEquals(1, registryService.getCurrentNodes().size());
- registryService.deRegistry();
+ if (enableHA()) {
+ registryService.registry();
+ Assertions.assertEquals(1, registryService.getCurrentNodes().size());
+ registryService.unRegister();
+ }
+ }
+
+ public boolean enableHA() {
+ return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}
}
From f2349d33c3221e3df2195d46b3358840458a428a Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Wed, 30 Oct 2024 11:37:30 +0800
Subject: [PATCH 7/9] fix dependencies
---
tools/dependencies/known-dependencies.txt | 2 ++
1 file changed, 2 insertions(+)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 43696a93cf..5bc70347e6 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -379,3 +379,5 @@ xnio-nio-3.8.7.Final.jar
xz-1.5.jar
zookeeper-3.6.3.jar
icu4j-67.1.jar
+zookeeper-jute-3.6.3.jar
+netty-transport-native-epoll-4.1.91.Final.jar
From 5ac640a6c864608ba07875984c773d6fd7a12193 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Wed, 30 Oct 2024 12:59:11 +0800
Subject: [PATCH 8/9] fix comments
---
.../streampark/console/StreamParkConsoleBootstrap.java | 1 -
.../streampark/console/core/service/RegistryService.java | 5 -----
.../console/core/service/impl/RegistryServiceImpl.java | 4 +++-
.../console/core/service/RegistryServiceTest.java | 9 ++++++---
4 files changed, 9 insertions(+), 10 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 70ba2fc973..3d04bfb008 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -67,7 +67,6 @@ public static void main(String[] args) throws Exception {
public void init() {
if (enableHA()) {
registryService.registry();
- registryService.doRegister();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.unRegister();
log.info("RegistryService close success.");
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
index 0b3fc4a0a8..d504be67a4 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java
@@ -24,11 +24,6 @@ public interface RegistryService {
*/
void registry();
- /**
- * Start the registry service.
- */
- void doRegister();
-
/**
* Close the registry service.
*/
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
index 543cba6fc6..f645267d78 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java
@@ -84,18 +84,20 @@ public void registry() {
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
currentNodes.add(nodePath);
+
+ doRegister();
} catch (Exception e) {
log.error("Failed to init ZooKeeper client", e);
}
}
- @Override
public void doRegister() {
try {
distributedTaskService.init(currentNodes, nodePath);
startHeartbeat();
startHeartbeatChecker();
handleNodeChanges();
+ log.info("ZooKeeper client started: {}", nodePath);
} catch (Exception e) {
log.error("Failed to start ZooKeeper client", e);
}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
index e624eb5a6f..66e3c73a7c 100644
--- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java
@@ -30,9 +30,12 @@ public class RegistryServiceTest {
@Test
public void testRegister() {
if (enableHA()) {
- registryService.registry();
- Assertions.assertEquals(1, registryService.getCurrentNodes().size());
- registryService.unRegister();
+ try {
+ registryService.registry();
+ } catch (Exception e) {
+ Assertions.assertEquals(1, registryService.getCurrentNodes().size());
+ registryService.unRegister();
+ }
}
}
From 5d7e0d9445cd62d410d09fe4d7d3f21b736c0b11 Mon Sep 17 00:00:00 2001
From: BUAAserein <18376359@buaa.edu.cn>
Date: Wed, 30 Oct 2024 13:12:10 +0800
Subject: [PATCH 9/9] refactor
---
.../console/StreamParkConsoleBootstrap.java | 24 -------------------
.../console/core/runner/EnvInitializer.java | 17 +++++++++++++
2 files changed, 17 insertions(+), 24 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index 3d04bfb008..ad1e279e87 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -17,18 +17,13 @@
package org.apache.streampark.console;
-import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.config.SpringProperties;
-import org.apache.streampark.console.core.service.RegistryService;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
-import javax.annotation.PostConstruct;
-
/**
*
*
@@ -53,29 +48,10 @@
@EnableScheduling
public class StreamParkConsoleBootstrap {
- @Autowired
- private RegistryService registryService;
-
public static void main(String[] args) throws Exception {
new SpringApplicationBuilder()
.properties(SpringProperties.get())
.sources(StreamParkConsoleBootstrap.class)
.run(args);
}
-
- @PostConstruct
- public void init() {
- if (enableHA()) {
- registryService.registry();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- registryService.unRegister();
- log.info("RegistryService close success.");
- }));
- }
- }
-
- public boolean enableHA() {
- return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
- }
-
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 4e5ba38ac3..ebcaef12fb 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -26,9 +26,11 @@
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.SystemPropertyUtils;
+import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.SparkEnv;
+import org.apache.streampark.console.core.service.RegistryService;
import org.apache.streampark.console.core.service.SettingService;
import org.apache.commons.lang3.StringUtils;
@@ -81,6 +83,9 @@ public void run(ApplicationArguments args) throws Exception {
// init InternalConfig
initConfig();
+ // init RegistryService
+ initRegistryService();
+
boolean isTest = Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test");
if (!isTest) {
// initialize local file system resources
@@ -110,6 +115,18 @@ private void initConfig() {
overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
}
+ private void initRegistryService() {
+ boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
+ if (enable) {
+ RegistryService registryService = SpringContextUtils.getBean(RegistryService.class);
+ registryService.registry();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ registryService.unRegister();
+ log.info("RegistryService unRegister success");
+ }));
+ }
+ }
+
private void overrideSystemProp(String key, String defaultValue) {
String value = context.getEnvironment().getProperty(key, defaultValue);
log.info("initialize system properties: key:{}, value:{}", key, value);