From 52ed6865575272a174e22440763fdf64b8e736e5 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Tue, 29 Oct 2024 19:33:04 +0800 Subject: [PATCH 1/9] zookeeper registry --- pom.xml | 7 + .../src/main/assembly/conf/config.yaml | 4 + .../console/StreamParkConsoleBootstrap.java | 16 ++ .../console/core/service/RegistryService.java | 28 +++ .../service/impl/RegistryServiceImpl.java | 203 ++++++++++++++++++ .../core/service/RegistryServiceTest.java | 36 ++++ 6 files changed, 294 insertions(+) create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java create mode 100644 streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java create mode 100644 streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java diff --git a/pom.xml b/pom.xml index 0c9fdbc156..4a65812bae 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 3.3.4 2.1.10 3.3.0 + 3.6.3 6.2.3 2.17 2.5.0 @@ -242,6 +243,12 @@ test + + org.apache.zookeeper + zookeeper + ${zoopkeeper.version} + + redis.clients jedis diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml index 084e991121..a12277c625 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml +++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml @@ -106,6 +106,10 @@ registry: heartbeat-refresh-interval: 1s session-timeout: 3s +zookeeper: + # zookeeper address + address: localhost:2181 + network: # network interface preferred like eth0, default: empty preferred-interface: "" diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index 601fc7f00f..fda4c7d827 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -18,12 +18,16 @@ package org.apache.streampark.console; import org.apache.streampark.console.base.config.SpringProperties; +import org.apache.streampark.console.core.service.RegistryService; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.scheduling.annotation.EnableScheduling; +import javax.annotation.PostConstruct; + /** * * @@ -48,6 +52,9 @@ @EnableScheduling public class StreamParkConsoleBootstrap { + @Autowired + private RegistryService registryService; + public static void main(String[] args) throws Exception { new SpringApplicationBuilder() .properties(SpringProperties.get()) @@ -55,4 +62,13 @@ public static void main(String[] args) throws Exception { .run(args); } + @PostConstruct + public void init() { + registryService.start(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + registryService.close(); + log.info("RegistryService close success."); + })); + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java new file mode 100644 index 0000000000..41e8b3dd18 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +public interface RegistryService { + + void start(); + + /** + * Close the registry service. + */ + void close(); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java new file mode 100644 index 0000000000..196216960c --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.common.util.SystemPropertyUtils; +import org.apache.streampark.console.core.service.DistributedTaskService; +import org.apache.streampark.console.core.service.RegistryService; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; + +import java.net.InetAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE; + +@Slf4j +@Service +public class RegistryServiceImpl implements RegistryService { + + private static final String REGISTRY_PATH = "/services"; + private static final int HEARTBEAT_INTERVAL = 10000; + private static final int HEARTBEAT_TIMEOUT = 60000; + + private String zk_address; + private ZooKeeper zk; + private String nodePath; + private Watcher watcher = event -> { + if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged + && event.getPath().equals(REGISTRY_PATH)) { + handleNodeChanges(); + } + }; + + @Getter + private Set currentNodes = new HashSet<>(); + + @Autowired + private DistributedTaskService distributedTaskService; + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); + + @PostConstruct + public void init() { + try { + zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181"); + zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); + + if (zk.exists(REGISTRY_PATH, false) == null) { + zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + String ip = InetAddress.getLocalHost().getHostAddress(); + String port = SystemPropertyUtils.get("server.port", "10000"); + nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0], + OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + + currentNodes.add(nodePath); + } catch (Exception e) { + log.error("Failed to init ZooKeeper client", e); + } + } + + @Override + public void start() { + try { + distributedTaskService.init(currentNodes, nodePath); + startHeartbeat(); + startHeartbeatChecker(); + handleNodeChanges(); + } catch (Exception e) { + log.error("Failed to start ZooKeeper client", e); + } + } + + private void startHeartbeat() { + scheduler.scheduleAtFixedRate(() -> { + try { + zk.setData(nodePath, new byte[0], -1); + log.info("Heartbeat updated for node: {}", nodePath); + } catch (KeeperException e) { + log.info("Zookeeper session expired, attempting to reconnect..."); + reconnectAndRegister(); + } catch (InterruptedException e) { + log.error("Failed to update heartbeat for node: {}", nodePath, e); + } + }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); + } + + private void startHeartbeatChecker() { + scheduler.scheduleAtFixedRate(() -> { + try { + long now = System.currentTimeMillis(); + List servers = zk.getChildren(REGISTRY_PATH, false); + for (String server : servers) { + String serverPath = REGISTRY_PATH + "/" + server; + Stat stat = zk.exists(serverPath, false); + if (stat != null && (now - stat.getMtime() > HEARTBEAT_TIMEOUT)) { + zk.delete(serverPath, -1); + log.info("Deleted stale node: {}", serverPath); + } + } + } catch (KeeperException e) { + log.info("Zookeeper session expired, attempting to reconnect..."); + reconnectAndRegister(); + } catch (InterruptedException e) { + log.error("Failed to check heartbeat", e); + } + }, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); + } + + private synchronized void handleNodeChanges() { + try { + List nodes = zk.getChildren(REGISTRY_PATH, true); + Set newNodes = new HashSet<>(nodes); + + for (String node : newNodes) { + if (!currentNodes.contains(node)) { + log.info("Node added: {}", node); + distributedTaskService.addServer(node); + } + } + + for (String node : currentNodes) { + if (!newNodes.contains(node)) { + log.info("Node removed: {}", node); + distributedTaskService.removeServer(node); + } + } + + currentNodes = newNodes; + log.info("Online servers: {}", currentNodes); + } catch (KeeperException e) { + log.info("Zookeeper session expired, attempting to reconnect..."); + reconnectAndRegister(); + } catch (InterruptedException e) { + log.error("Failed to handle node changes", e); + } + } + + private void reconnectAndRegister() { + int retries = 5; + while (retries > 0) { + try { + zk.close(); + zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); + zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + return; + } catch (Exception e) { + retries--; + log.warn("Retrying connection, attempts left: {}", retries, e); + try { + Thread.sleep(3000); + } catch (InterruptedException ignored) { + + } + } + } + log.error("Failed to reconnect and register node after multiple attempts."); + } + + @Override + public void close() { + try { + zk.close(); + scheduler.shutdown(); + log.info("ZooKeeper client closed: {}", nodePath); + } catch (InterruptedException e) { + log.error("Failed to close ZooKeeper client", e); + } + } + +} diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java new file mode 100644 index 0000000000..756c0911db --- /dev/null +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.service.impl.RegistryServiceImpl; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class RegistryServiceTest { + + private final RegistryServiceImpl registryService = new RegistryServiceImpl(); + + @Test + public void testRegister() { + registryService.init(); + Assertions.assertEquals(1, registryService.getCurrentNodes().size()); + registryService.close(); + } + +} From 7cea308b4743664fb3c7ec32f467f568468a81f1 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Tue, 29 Oct 2024 19:39:04 +0800 Subject: [PATCH 2/9] add registry interface comment --- .../streampark/console/core/service/RegistryService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java index 41e8b3dd18..1e362ad1bb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -19,6 +19,9 @@ public interface RegistryService { + /** + * Start the registry service. + */ void start(); /** From 0ebcf9192226eb0eb4dd9b4b4add6dda054dab88 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Tue, 29 Oct 2024 19:46:49 +0800 Subject: [PATCH 3/9] handle InterruptedException --- .../console/core/service/impl/RegistryServiceImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index 196216960c..3fbc182345 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -112,6 +112,7 @@ private void startHeartbeat() { log.info("Zookeeper session expired, attempting to reconnect..."); reconnectAndRegister(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Failed to update heartbeat for node: {}", nodePath, e); } }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS); @@ -134,6 +135,7 @@ private void startHeartbeatChecker() { log.info("Zookeeper session expired, attempting to reconnect..."); reconnectAndRegister(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Failed to check heartbeat", e); } }, HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS); @@ -164,6 +166,7 @@ private synchronized void handleNodeChanges() { log.info("Zookeeper session expired, attempting to reconnect..."); reconnectAndRegister(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Failed to handle node changes", e); } } @@ -182,7 +185,7 @@ private void reconnectAndRegister() { try { Thread.sleep(3000); } catch (InterruptedException ignored) { - + Thread.currentThread().interrupt(); } } } @@ -196,6 +199,7 @@ public void close() { scheduler.shutdown(); log.info("ZooKeeper client closed: {}", nodePath); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Failed to close ZooKeeper client", e); } } From 0296878cc49b21eb321a8172bfc3ffa181d52d2b Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Tue, 29 Oct 2024 19:57:47 +0800 Subject: [PATCH 4/9] fix dependencies --- tools/dependencies/known-dependencies.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index e685de5ad8..43696a93cf 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -377,5 +377,5 @@ xml-apis-1.4.01.jar xnio-api-3.8.7.Final.jar xnio-nio-3.8.7.Final.jar xz-1.5.jar -zookeeper-3.4.14.jar +zookeeper-3.6.3.jar icu4j-67.1.jar From a7beaa5118294b36f692c822195494b7b3c19550 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Tue, 29 Oct 2024 22:16:57 +0800 Subject: [PATCH 5/9] fix comments --- .../src/main/assembly/conf/config.yaml | 16 ++++++------- .../console/StreamParkConsoleBootstrap.java | 17 +++++++++---- .../console/core/service/RegistryService.java | 4 ++-- .../service/impl/RegistryServiceImpl.java | 24 +++++++++++++++---- .../core/service/RegistryServiceTest.java | 6 +++-- 5 files changed, 44 insertions(+), 23 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml index a12277c625..91e843a2b9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml +++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml @@ -100,15 +100,13 @@ sso: # Optional, change by authentication client # Please replace and fill in your client config below when enabled SSO -registry: - # default using jdbc as registry - type: jdbc - heartbeat-refresh-interval: 1s - session-timeout: 3s - -zookeeper: - # zookeeper address - address: localhost:2181 +high-availability: + enable: false # true + # The list of ZooKeeper quorum peers that coordinate the high-availability + # setup. This must be a list of the form: + # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) + # + zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181 network: # network interface preferred like eth0, default: empty diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index fda4c7d827..1d3492b681 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -17,6 +17,7 @@ package org.apache.streampark.console; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.base.config.SpringProperties; import org.apache.streampark.console.core.service.RegistryService; @@ -64,11 +65,17 @@ public static void main(String[] args) throws Exception { @PostConstruct public void init() { - registryService.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - registryService.close(); - log.info("RegistryService close success."); - })); + if (enableHA()) { + registryService.startListening(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + registryService.deRegistry(); + log.info("RegistryService close success."); + })); + } + } + + public boolean enableHA() { + return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java index 1e362ad1bb..d6a0cfb196 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -22,10 +22,10 @@ public interface RegistryService { /** * Start the registry service. */ - void start(); + void startListening(); /** * Close the registry service. */ - void close(); + void deRegistry(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index 3fbc182345..cab5afadf5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -71,9 +71,12 @@ public class RegistryServiceImpl implements RegistryService { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); @PostConstruct - public void init() { + public void registry() { + if (!enableHA()) { + return; + } try { - zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181"); + zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181"); zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); if (zk.exists(REGISTRY_PATH, false) == null) { @@ -82,7 +85,8 @@ public void init() { String ip = InetAddress.getLocalHost().getHostAddress(); String port = SystemPropertyUtils.get("server.port", "10000"); - nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0], + String server_id = ip + ":" + port; + nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); currentNodes.add(nodePath); @@ -92,7 +96,10 @@ public void init() { } @Override - public void start() { + public void startListening() { + if (!enableHA()) { + return; + } try { distributedTaskService.init(currentNodes, nodePath); startHeartbeat(); @@ -193,7 +200,10 @@ private void reconnectAndRegister() { } @Override - public void close() { + public void deRegistry() { + if (!enableHA()) { + return; + } try { zk.close(); scheduler.shutdown(); @@ -204,4 +214,8 @@ public void close() { } } + public boolean enableHA() { + return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); + } + } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java index 756c0911db..e701b2a614 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.core.service.impl.RegistryServiceImpl; import org.junit.jupiter.api.Assertions; @@ -28,9 +29,10 @@ public class RegistryServiceTest { @Test public void testRegister() { - registryService.init(); + SystemPropertyUtils.set("high-availability.enable", "true"); + registryService.registry(); Assertions.assertEquals(1, registryService.getCurrentNodes().size()); - registryService.close(); + registryService.deRegistry(); } } From 07af79995d6c9982f04854a09a826623a58b72e4 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Wed, 30 Oct 2024 11:35:09 +0800 Subject: [PATCH 6/9] fix commets --- .../console/StreamParkConsoleBootstrap.java | 5 ++-- .../console/core/service/RegistryService.java | 9 ++++-- .../service/impl/RegistryServiceImpl.java | 29 ++++--------------- .../core/service/RegistryServiceTest.java | 13 ++++++--- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index 1d3492b681..70ba2fc973 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -66,9 +66,10 @@ public static void main(String[] args) throws Exception { @PostConstruct public void init() { if (enableHA()) { - registryService.startListening(); + registryService.registry(); + registryService.doRegister(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { - registryService.deRegistry(); + registryService.unRegister(); log.info("RegistryService close success."); })); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java index d6a0cfb196..0b3fc4a0a8 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -19,13 +19,18 @@ public interface RegistryService { + /** + * Registry the service. + */ + void registry(); + /** * Start the registry service. */ - void startListening(); + void doRegister(); /** * Close the registry service. */ - void deRegistry(); + void unRegister(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index cab5afadf5..543cba6fc6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -32,8 +32,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; - import java.net.InetAddress; import java.util.HashSet; import java.util.List; @@ -52,7 +50,7 @@ public class RegistryServiceImpl implements RegistryService { private static final int HEARTBEAT_INTERVAL = 10000; private static final int HEARTBEAT_TIMEOUT = 60000; - private String zk_address; + private String zkAddress; private ZooKeeper zk; private String nodePath; private Watcher watcher = event -> { @@ -70,14 +68,10 @@ public class RegistryServiceImpl implements RegistryService { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); - @PostConstruct public void registry() { - if (!enableHA()) { - return; - } try { - zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181"); - zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); + zkAddress = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181"); + zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher); if (zk.exists(REGISTRY_PATH, false) == null) { zk.create(REGISTRY_PATH, new byte[0], OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -96,10 +90,7 @@ public void registry() { } @Override - public void startListening() { - if (!enableHA()) { - return; - } + public void doRegister() { try { distributedTaskService.init(currentNodes, nodePath); startHeartbeat(); @@ -183,7 +174,7 @@ private void reconnectAndRegister() { while (retries > 0) { try { zk.close(); - zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); + zk = new ZooKeeper(zkAddress, HEARTBEAT_TIMEOUT, watcher); zk.create(nodePath, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); return; } catch (Exception e) { @@ -200,10 +191,7 @@ private void reconnectAndRegister() { } @Override - public void deRegistry() { - if (!enableHA()) { - return; - } + public void unRegister() { try { zk.close(); scheduler.shutdown(); @@ -213,9 +201,4 @@ public void deRegistry() { log.error("Failed to close ZooKeeper client", e); } } - - public boolean enableHA() { - return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); - } - } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java index e701b2a614..e624eb5a6f 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java @@ -29,10 +29,15 @@ public class RegistryServiceTest { @Test public void testRegister() { - SystemPropertyUtils.set("high-availability.enable", "true"); - registryService.registry(); - Assertions.assertEquals(1, registryService.getCurrentNodes().size()); - registryService.deRegistry(); + if (enableHA()) { + registryService.registry(); + Assertions.assertEquals(1, registryService.getCurrentNodes().size()); + registryService.unRegister(); + } + } + + public boolean enableHA() { + return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); } } From f2349d33c3221e3df2195d46b3358840458a428a Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Wed, 30 Oct 2024 11:37:30 +0800 Subject: [PATCH 7/9] fix dependencies --- tools/dependencies/known-dependencies.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 43696a93cf..5bc70347e6 100644 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -379,3 +379,5 @@ xnio-nio-3.8.7.Final.jar xz-1.5.jar zookeeper-3.6.3.jar icu4j-67.1.jar +zookeeper-jute-3.6.3.jar +netty-transport-native-epoll-4.1.91.Final.jar From 5ac640a6c864608ba07875984c773d6fd7a12193 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Wed, 30 Oct 2024 12:59:11 +0800 Subject: [PATCH 8/9] fix comments --- .../streampark/console/StreamParkConsoleBootstrap.java | 1 - .../streampark/console/core/service/RegistryService.java | 5 ----- .../console/core/service/impl/RegistryServiceImpl.java | 4 +++- .../console/core/service/RegistryServiceTest.java | 9 ++++++--- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index 70ba2fc973..3d04bfb008 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -67,7 +67,6 @@ public static void main(String[] args) throws Exception { public void init() { if (enableHA()) { registryService.registry(); - registryService.doRegister(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { registryService.unRegister(); log.info("RegistryService close success."); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java index 0b3fc4a0a8..d504be67a4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -24,11 +24,6 @@ public interface RegistryService { */ void registry(); - /** - * Start the registry service. - */ - void doRegister(); - /** * Close the registry service. */ diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index 543cba6fc6..f645267d78 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -84,18 +84,20 @@ public void registry() { OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); currentNodes.add(nodePath); + + doRegister(); } catch (Exception e) { log.error("Failed to init ZooKeeper client", e); } } - @Override public void doRegister() { try { distributedTaskService.init(currentNodes, nodePath); startHeartbeat(); startHeartbeatChecker(); handleNodeChanges(); + log.info("ZooKeeper client started: {}", nodePath); } catch (Exception e) { log.error("Failed to start ZooKeeper client", e); } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java index e624eb5a6f..66e3c73a7c 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java @@ -30,9 +30,12 @@ public class RegistryServiceTest { @Test public void testRegister() { if (enableHA()) { - registryService.registry(); - Assertions.assertEquals(1, registryService.getCurrentNodes().size()); - registryService.unRegister(); + try { + registryService.registry(); + } catch (Exception e) { + Assertions.assertEquals(1, registryService.getCurrentNodes().size()); + registryService.unRegister(); + } } } From 5d7e0d9445cd62d410d09fe4d7d3f21b736c0b11 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Wed, 30 Oct 2024 13:12:10 +0800 Subject: [PATCH 9/9] refactor --- .../console/StreamParkConsoleBootstrap.java | 24 ------------------- .../console/core/runner/EnvInitializer.java | 17 +++++++++++++ 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index 3d04bfb008..ad1e279e87 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -17,18 +17,13 @@ package org.apache.streampark.console; -import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.base.config.SpringProperties; -import org.apache.streampark.console.core.service.RegistryService; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.scheduling.annotation.EnableScheduling; -import javax.annotation.PostConstruct; - /** * * @@ -53,29 +48,10 @@ @EnableScheduling public class StreamParkConsoleBootstrap { - @Autowired - private RegistryService registryService; - public static void main(String[] args) throws Exception { new SpringApplicationBuilder() .properties(SpringProperties.get()) .sources(StreamParkConsoleBootstrap.class) .run(args); } - - @PostConstruct - public void init() { - if (enableHA()) { - registryService.registry(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - registryService.unRegister(); - log.info("RegistryService close success."); - })); - } - } - - public boolean enableHA() { - return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); - } - } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 4e5ba38ac3..ebcaef12fb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -26,9 +26,11 @@ import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.AssertUtils; import org.apache.streampark.common.util.SystemPropertyUtils; +import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.SparkEnv; +import org.apache.streampark.console.core.service.RegistryService; import org.apache.streampark.console.core.service.SettingService; import org.apache.commons.lang3.StringUtils; @@ -81,6 +83,9 @@ public void run(ApplicationArguments args) throws Exception { // init InternalConfig initConfig(); + // init RegistryService + initRegistryService(); + boolean isTest = Arrays.asList(context.getEnvironment().getActiveProfiles()).contains("test"); if (!isTest) { // initialize local file system resources @@ -110,6 +115,18 @@ private void initConfig() { overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName); } + private void initRegistryService() { + boolean enable = SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); + if (enable) { + RegistryService registryService = SpringContextUtils.getBean(RegistryService.class); + registryService.registry(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + registryService.unRegister(); + log.info("RegistryService unRegister success"); + })); + } + } + private void overrideSystemProp(String key, String defaultValue) { String value = context.getEnvironment().getProperty(key, defaultValue); log.info("initialize system properties: key:{}, value:{}", key, value);