children(String key) {
+ checkNotNull(key);
+ return raftRegistryClient.getRegistryDataChildren(key);
+ }
+
+ @Override
+ public boolean exists(String key) {
+ checkNotNull(key);
+ return raftRegistryClient.existRaftRegistryDataKey(key);
+ }
+
+ @Override
+ public boolean acquireLock(String key) {
+ checkNotNull(key);
+ return raftRegistryClient.acquireRaftRegistryLock(key);
+ }
+
+ @Override
+ public boolean acquireLock(String key, long timeout) {
+ checkNotNull(key);
+ return raftRegistryClient.acquireRaftRegistryLock(key, timeout);
+ }
+
+ @Override
+ public boolean releaseLock(String key) {
+ checkNotNull(key);
+ return raftRegistryClient.releaseRaftRegistryLock(key);
+ }
+
+ @Override
+ public void close() {
+ raftRegistryClient.close();
+ }
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java
new file mode 100644
index 000000000000..0da871c42f15
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryAutoConfiguration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import org.apache.dolphinscheduler.plugin.registry.raft.server.RaftRegistryServer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+
+@Slf4j
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@EnableConfigurationProperties(RaftRegistryProperties.class)
+public class RaftRegistryAutoConfiguration {
+
+ public RaftRegistryAutoConfiguration() {
+ log.info("Load RaftRegistryAutoConfiguration");
+ }
+
+ @Bean
+ public RaftRegistryServer raftRegistryServer(RaftRegistryProperties raftRegistryProperties) {
+ RaftRegistryServer raftRegistryServer = new RaftRegistryServer(raftRegistryProperties);
+ raftRegistryServer.start();
+ return raftRegistryServer;
+ }
+
+ @Bean
+ @DependsOn("raftRegistryServer")
+ public RaftRegistry raftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
+ RaftRegistry raftRegistry = new RaftRegistry(raftRegistryProperties);
+ raftRegistry.start();
+ return raftRegistry;
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java
new file mode 100644
index 000000000000..a7d9442ace22
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryProperties.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import lombok.Data;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+@Data
+@Configuration
+@ConditionalOnProperty(prefix = "registry", name = "type", havingValue = "raft")
+@ConfigurationProperties(prefix = "registry")
+public class RaftRegistryProperties {
+
+ private String clusterName;
+ private String serverAddressList;
+ private String serverAddress;
+ private int serverPort;
+ private String logStorageDir;
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java
new file mode 100644
index 000000000000..3a6fd29d39d3
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/IRaftRegistryClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.client;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.util.Collection;
+
+public interface IRaftRegistryClient extends AutoCloseable {
+
+ /**
+ * Start the raft registry client. Once started, the client will connect to the raft registry server and then it can be used.
+ */
+ void start();
+
+ /**
+ * Check the connectivity of the client.
+ *
+ * @return true if the client is connected, false otherwise
+ */
+ boolean isConnectivity();
+
+ /**
+ * Subscribe to the raft registry connection state change event.
+ *
+ * @param connectionStateListener the listener to handle connection state changes
+ */
+ void subscribeConnectionStateChange(ConnectionListener connectionStateListener);
+
+ /**
+ * Subscribe to the register data change event.
+ *
+ * @param path the path to subscribe to
+ * @param listener the listener to handle data changes
+ */
+ void subscribeRaftRegistryDataChange(String path, SubscribeListener listener);
+
+ /**
+ * Get the raft register data by key.
+ *
+ * @param key the key of the register data
+ * @return the value associated with the key
+ */
+ String getRegistryDataByKey(String key);
+
+ /**
+ * Put the register data to the raft registry server.
+ *
+ * If the key already exists, then update the value. If the key does not exist, then insert a new key-value pair.
+ *
+ * @param key the key of the register data
+ * @param value the value to be associated with the key
+ * @param deleteOnDisconnect if true, the data will be deleted when the client disconnects
+ */
+ void putRegistryData(String key, String value, boolean deleteOnDisconnect);
+
+ /**
+ * Delete the register data by key.
+ *
+ * @param key the key of the register data to be deleted
+ */
+ void deleteRegistryDataByKey(String key);
+
+ /**
+ * List all the children of the given key.
+ *
+ * e.g. key = "/dolphinscheduler/master", and data exists in db as "/dolphinscheduler/master/master1", "/dolphinscheduler/master/master2"
+ *
+ * then the return value will be ["master1", "master2"]
+ *
+ * @param key the key whose children are to be listed
+ * @return a collection of children keys
+ */
+ Collection getRegistryDataChildren(String key);
+
+ /**
+ * Check if the key exists in the raft registry server.
+ *
+ * @param key the key to check
+ * @return true if the key exists, false otherwise
+ */
+ boolean existRaftRegistryDataKey(String key);
+
+ /**
+ * Acquire the raft registry lock by key. This is a blocking method. If you want to stop the blocking, you can interrupt the thread.
+ *
+ * @param lockKey the key of the lock to be acquired
+ * @return true if the lock was successfully acquired, false otherwise
+ */
+ boolean acquireRaftRegistryLock(String lockKey);
+
+ /**
+ * Acquire the raft registry lock by key until timeout.
+ *
+ * @param lockKey the key of the lock to be acquired
+ * @param timeout the maximum time to wait for the lock
+ * @return true if the lock was successfully acquired, false otherwise
+ */
+ boolean acquireRaftRegistryLock(String lockKey, long timeout);
+
+ /**
+ * Release the raft registry lock by key. If the lockKey does not exist, this method will do nothing.
+ *
+ * @param lockKey the key of the lock to be released
+ * @return true if the lock was successfully released, false otherwise
+ */
+ boolean releaseRaftRegistryLock(String lockKey);
+
+ /**
+ * Close the raft registry client. Once the client is closed, it cannot work anymore.
+ */
+ @Override
+ void close();
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java
new file mode 100644
index 000000000000..4c84abcc7ca5
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/client/RaftRegistryClient.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.client;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+import static com.alipay.sofa.jraft.util.BytesUtil.writeUtf8;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftConnectionStateManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftLockManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.IRaftSubscribeDataManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftConnectionStateManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftLockManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.manage.RaftSubscribeDataManager;
+import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RegionRouteTableOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.MultiRegionRouteTableOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+
+@Slf4j
+public class RaftRegistryClient implements IRaftRegistryClient {
+
+ private final RheaKVStore rheaKvStore;
+ private final RaftRegistryProperties raftRegistryProperties;
+ private final IRaftConnectionStateManager raftConnectionStateManager;
+ private final IRaftSubscribeDataManager raftSubscribeDataManager;
+ private final IRaftLockManager raftLockManager;
+ private volatile boolean started;
+ public RaftRegistryClient(RaftRegistryProperties raftRegistryProperties) {
+ this.raftRegistryProperties = raftRegistryProperties;
+ this.rheaKvStore = new DefaultRheaKVStore();
+ this.raftConnectionStateManager = new RaftConnectionStateManager(raftRegistryProperties);
+ this.raftSubscribeDataManager = new RaftSubscribeDataManager(rheaKvStore);
+ this.raftLockManager = new RaftLockManager(rheaKvStore);
+
+ initRheakv();
+ }
+
+ private void initRheakv() {
+ final List regionRouteTableOptionsList = MultiRegionRouteTableOptionsConfigured
+ .newConfigured()
+ .withInitialServerList(-1L /* default id */, raftRegistryProperties.getServerAddressList())
+ .config();
+ final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+ .withFake(true)
+ .withRegionRouteTableOptionsList(regionRouteTableOptionsList)
+ .config();
+ final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured()
+ .withClusterName(raftRegistryProperties.getClusterName())
+ .withPlacementDriverOptions(pdOpts)
+ .config();
+ this.rheaKvStore.init(opts);
+ }
+
+ @Override
+ public void start() {
+ if (this.started) {
+ log.info("RaftRegistryClient is already started");
+ return;
+ }
+ log.info("starting raft client registry...");
+ raftSubscribeDataManager.start();
+ raftConnectionStateManager.start();
+ this.started = true;
+ log.info("raft client registry started successfully");
+ }
+
+ @Override
+ public boolean isConnectivity() {
+ return raftConnectionStateManager.getConnectionState() == ConnectionState.CONNECTED;
+ }
+
+ @Override
+ public void subscribeConnectionStateChange(ConnectionListener connectionStateListener) {
+ raftConnectionStateManager.addConnectionListener(connectionStateListener);
+ }
+
+ @Override
+ public void subscribeRaftRegistryDataChange(String path, SubscribeListener listener) {
+ raftSubscribeDataManager.addDataSubscribeListener(path, listener);
+ }
+
+ @Override
+ public String getRegistryDataByKey(String key) {
+ String compositeValue = readUtf8(rheaKvStore.bGet(key));
+ if (compositeValue == null) {
+ throw new RegistryException("key does not exist:" + key);
+ }
+ String[] nodeTypeAndValue = compositeValue.split(Constants.AT_SIGN);
+ if (nodeTypeAndValue.length != 2) {
+ throw new RegistryException("value format is incorrect for key: " + key + ", value: " + compositeValue);
+ }
+ return nodeTypeAndValue[1];
+ }
+
+ @Override
+ public void putRegistryData(String key, String value, boolean deleteOnDisconnect) {
+ NodeType nodeType = deleteOnDisconnect ? NodeType.EPHEMERAL : NodeType.PERSISTENT;
+ String compositeValue = nodeType.getName() + Constants.AT_SIGN + value;
+ rheaKvStore.bPut(key, writeUtf8(compositeValue));
+ }
+
+ @Override
+ public void deleteRegistryDataByKey(String key) {
+ rheaKvStore.bDelete(key);
+ }
+
+ @Override
+ public Collection getRegistryDataChildren(String key) {
+ String basePath = null;
+ if (key.startsWith(RegistryNodeType.MASTER.getRegistryPath())) {
+ basePath = RegistryNodeType.MASTER.getRegistryPath();
+ } else if (key.startsWith(RegistryNodeType.WORKER.getRegistryPath())) {
+ basePath = RegistryNodeType.WORKER.getRegistryPath();
+ } else if (key.startsWith(RegistryNodeType.ALERT_SERVER.getRegistryPath())) {
+ basePath = RegistryNodeType.ALERT_SERVER.getRegistryPath();
+ } else {
+ throw new UnsupportedOperationException("unsupported get registry data children by key:" + key);
+ }
+ List kvEntries = rheaKvStore.bScan(basePath + Constants.SINGLE_SLASH,
+ basePath + Constants.SINGLE_SLASH + Constants.RAFT_END_KEY);
+ return getRegistryList(kvEntries);
+ }
+
+ @Override
+ public boolean existRaftRegistryDataKey(String key) {
+ return rheaKvStore.bContainsKey(key);
+ }
+
+ private Collection getRegistryList(List kvEntries) {
+ if (kvEntries == null || kvEntries.isEmpty()) {
+ return new ArrayList<>();
+ }
+ List registryList = new ArrayList<>();
+ for (KVEntry kvEntry : kvEntries) {
+ String entryKey = readUtf8(kvEntry.getKey());
+ String childKey = entryKey.substring(entryKey.lastIndexOf(Constants.SINGLE_SLASH) + 1);
+ registryList.add(childKey);
+ }
+ return registryList;
+ }
+
+ @Override
+ public boolean acquireRaftRegistryLock(String lockKey) {
+ try {
+ return raftLockManager.acquireLock(lockKey);
+ } catch (Exception ex) {
+ log.error("acquire raft registry lock error", ex);
+ raftLockManager.releaseLock(lockKey);
+ throw new RegistryException("acquire raft registry lock error: " + lockKey, ex);
+ }
+ }
+
+ @Override
+ public boolean acquireRaftRegistryLock(String lockKey, long timeout) {
+ try {
+ return raftLockManager.acquireLock(lockKey, timeout);
+ } catch (Exception ex) {
+ log.error("acquire raft registry lock error", ex);
+ raftLockManager.releaseLock(lockKey);
+ throw new RegistryException("acquire raft registry lock error: " + lockKey, ex);
+ }
+ }
+
+ @Override
+ public boolean releaseRaftRegistryLock(String lockKey) {
+ try {
+ return raftLockManager.releaseLock(lockKey);
+ } catch (Exception ex) {
+ log.error("release raft registry lock error", ex);
+ throw new RegistryException("release raft registry lock error, lockKey:" + lockKey, ex);
+ }
+ }
+
+ @Override
+ public void close() {
+ log.info("ready to close raft registry client");
+ if (rheaKvStore != null) {
+ rheaKvStore.shutdown();
+ }
+ this.started = false;
+ log.info("closed raft registry client");
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java
new file mode 100644
index 000000000000..f5c737b21231
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftConnectionStateManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+/**
+ * Interface for managing the connection state in a raft registry client.
+ */
+public interface IRaftConnectionStateManager extends AutoCloseable {
+
+ /**
+ * Starts the connection state manager.
+ * This method initializes and starts monitoring the connection state.
+ */
+ void start();
+
+ /**
+ * Adds a connection listener to listen for connection state changes.
+ *
+ * @param listener the listener to be added for connection state changes
+ */
+ void addConnectionListener(ConnectionListener listener);
+
+ /**
+ * Retrieves the current connection state.
+ *
+ * @return the current connection state
+ */
+ ConnectionState getConnectionState();
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java
new file mode 100644
index 000000000000..edd264dcc68f
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftLockManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+/**
+ * Interface for managing locks in a raft registry client.
+ */
+public interface IRaftLockManager extends AutoCloseable {
+
+ /**
+ * Acquires a lock with the specified key.
+ * This method blocks until the lock is acquired.
+ *
+ * @param lockKey the key for the lock
+ * @return true if the lock was successfully acquired, false otherwise
+ */
+ boolean acquireLock(String lockKey);
+
+ /**
+ * Acquires a lock with the specified key, with a timeout.
+ * This method blocks until the lock is acquired or the timeout is reached.
+ *
+ * @param lockKey the key for the lock
+ * @param timeout the maximum time to wait for the lock in milliseconds
+ * @return true if the lock was successfully acquired within the timeout, false otherwise
+ */
+ boolean acquireLock(String lockKey, long timeout);
+
+ /**
+ * Releases the lock with the specified key.
+ *
+ * @param lockKey the key for the lock
+ * @return true if the lock was successfully released, false otherwise
+ */
+ boolean releaseLock(String lockKey);
+
+ /**
+ * Closes the lock manager and releases any resources held by it.
+ *
+ * @throws Exception if an error occurs while closing the lock manager
+ */
+ @Override
+ void close() throws Exception;
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java
new file mode 100644
index 000000000000..21f18110c1a8
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/IRaftSubscribeDataManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+/**
+ * Interface for managing data subscriptions in a raft registry client.
+ */
+public interface IRaftSubscribeDataManager extends AutoCloseable {
+
+ /**
+ * Starts the data subscription manager.
+ * This method initializes and starts the data subscription functionality.
+ */
+ void start();
+
+ /**
+ * Adds a listener to subscribe to data changes at the specified path.
+ *
+ * @param path the path to subscribe to for data changes
+ * @param listener the listener to be notified of data changes
+ */
+ void addDataSubscribeListener(String path, SubscribeListener listener);
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java
new file mode 100644
index 000000000000..93073cf34fc8
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftConnectionStateManager.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
+import org.apache.dolphinscheduler.registry.api.ConnectionListener;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.alipay.sofa.jraft.RouteTable;
+import com.alipay.sofa.jraft.option.CliOptions;
+import com.alipay.sofa.jraft.rpc.CliClientService;
+import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@Slf4j
+public class RaftConnectionStateManager implements IRaftConnectionStateManager {
+
+ private static final String DEFAULT_REGION_ID = "--1";
+ private static final Duration CONNECT_STATE_CHECK_INTERVAL = Duration.ofSeconds(2);
+ private static final int CONNECT_STATE_REFRESH_THREAD_POOL_SIZE = 1;
+ private static final Duration REFRESH_LEADER_TIME_OUT = Duration.ofSeconds(2);
+ private static final int MAX_RANDOM_DELAY_MS = 500;
+ private ConnectionState currentConnectionState;
+ private final RaftRegistryProperties properties;
+ private final List connectionListeners = new CopyOnWriteArrayList<>();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final CliOptions cliOptions;
+ private final CliClientService cliClientService;
+
+ public RaftConnectionStateManager(RaftRegistryProperties properties) {
+ this.properties = properties;
+ this.cliOptions = new CliOptions();
+ this.cliClientService = new CliClientServiceImpl();
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(
+ CONNECT_STATE_REFRESH_THREAD_POOL_SIZE,
+ new ThreadFactoryBuilder().setNameFormat("ConnectionStateRefreshThread").setDaemon(true).build());
+ }
+ @Override
+ public void start() {
+ cliClientService.init(cliOptions);
+ scheduledExecutorService.scheduleWithFixedDelay(
+ new ConnectionStateRefreshTask(connectionListeners),
+ getRandomizedDelay(CONNECT_STATE_CHECK_INTERVAL.toMillis()),
+ getRandomizedDelay(CONNECT_STATE_CHECK_INTERVAL.toMillis()),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private long getRandomizedDelay(long baseDelay) {
+ // Add a random value in the range [0, RANDOM_DELAY_RANGE_MS]
+ Random random = new Random();
+ long randomOffset = random.nextInt(MAX_RANDOM_DELAY_MS + 1);
+ return baseDelay + randomOffset;
+ }
+
+ @Override
+ public void addConnectionListener(ConnectionListener listener) {
+ connectionListeners.add(listener);
+ }
+
+ @Override
+ public ConnectionState getConnectionState() {
+ return currentConnectionState;
+ }
+
+ class ConnectionStateRefreshTask implements Runnable {
+
+ private final List connectionListeners;
+ ConnectionStateRefreshTask(List connectionListeners) {
+ this.connectionListeners = connectionListeners;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ConnectionState newConnectionState = getCurrentConnectionState();
+ if (newConnectionState == currentConnectionState) {
+ // no state change
+ return;
+ }
+ if (newConnectionState == ConnectionState.DISCONNECTED
+ && currentConnectionState == ConnectionState.CONNECTED) {
+ currentConnectionState = ConnectionState.DISCONNECTED;
+ triggerListeners(ConnectionState.DISCONNECTED);
+ } else if (newConnectionState == ConnectionState.CONNECTED
+ && currentConnectionState == ConnectionState.DISCONNECTED) {
+ currentConnectionState = ConnectionState.CONNECTED;
+ triggerListeners(ConnectionState.RECONNECTED);
+ } else if (currentConnectionState == null) {
+ currentConnectionState = newConnectionState;
+ triggerListeners(currentConnectionState);
+ }
+ } catch (Exception ex) {
+ log.error("raft registry connection state check failed", ex);
+ currentConnectionState = ConnectionState.DISCONNECTED;
+ triggerListeners(ConnectionState.DISCONNECTED);
+ }
+ }
+
+ private ConnectionState getCurrentConnectionState() {
+ try {
+ String groupId = properties.getClusterName() + DEFAULT_REGION_ID;
+ if (RouteTable.getInstance()
+ .refreshLeader(cliClientService, groupId, (int) REFRESH_LEADER_TIME_OUT.toMillis()).isOk()) {
+ return ConnectionState.CONNECTED;
+ } else {
+ return ConnectionState.DISCONNECTED;
+ }
+ } catch (Exception ex) {
+ log.error("cannot connect to raft leader", ex);
+ return ConnectionState.DISCONNECTED;
+ }
+ }
+
+ private void triggerListeners(ConnectionState connectionState) {
+ for (ConnectionListener connectionListener : connectionListeners) {
+ connectionListener.onUpdate(connectionState);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ connectionListeners.clear();
+ scheduledExecutorService.shutdownNow();
+ cliClientService.shutdown();
+ }
+
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java
new file mode 100644
index 000000000000..f2c32f316cea
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftLockManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.plugin.registry.raft.model.RaftLockEntry;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
+
+public class RaftLockManager implements IRaftLockManager {
+
+ private final Map distributedLockMap = new ConcurrentHashMap<>();
+ private final RheaKVStore rheaKvStore;
+ private static final ScheduledExecutorService WATCH_DOG = Executors.newSingleThreadScheduledExecutor();
+ private static final String LOCK_OWNER_PREFIX = NetUtils.getHost() + "_" + OSUtils.getProcessID() + "_";
+ private static final Duration DISTRIBUTE_LOCK_TIME_OUT = Duration.ofSeconds(3);
+ private static final Duration DISTRIBUTE_LOCK_RETRY_INTERVAL = Duration.ofMillis(50);
+
+ public RaftLockManager(RheaKVStore rheaKVStore) {
+ this.rheaKvStore = rheaKVStore;
+ }
+
+ @Override
+ public boolean acquireLock(String lockKey) {
+ final String lockOwner = getLockOwnerPrefix();
+ if (isThreadReentrant(lockKey, lockOwner)) {
+ return true;
+ }
+
+ final DistributedLock distributedLock = rheaKvStore.getDistributedLock(lockKey,
+ DISTRIBUTE_LOCK_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);
+ while (true) {
+ if (distributedLock.tryLock()) {
+ distributedLockMap.put(lockKey, RaftLockEntry.builder().distributedLock(distributedLock)
+ .lockOwner(lockOwner)
+ .build());
+ return true;
+ } else {
+ // fail to acquire lock
+ ThreadUtils.sleep(DISTRIBUTE_LOCK_RETRY_INTERVAL.toMillis());
+ }
+ }
+ }
+
+ @Override
+ public boolean acquireLock(String lockKey, long timeout) {
+ final String lockOwner = getLockOwnerPrefix();
+ if (isThreadReentrant(lockKey, lockOwner)) {
+ return true;
+ }
+ final long endTime = System.currentTimeMillis() + timeout;
+ final DistributedLock distributedLock = rheaKvStore.getDistributedLock(lockKey,
+ DISTRIBUTE_LOCK_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS, WATCH_DOG);
+
+ while (System.currentTimeMillis() < endTime) {
+ if (distributedLock.tryLock()) {
+ distributedLockMap.put(lockKey, RaftLockEntry.builder().distributedLock(distributedLock)
+ .lockOwner(lockOwner)
+ .build());
+ return true;
+ } else {
+ // fail to acquire lock
+ ThreadUtils.sleep(DISTRIBUTE_LOCK_RETRY_INTERVAL.toMillis());
+ }
+ }
+
+ return false;
+ }
+
+ private boolean isThreadReentrant(String lockKey, String lockOwner) {
+ final RaftLockEntry lockEntry = distributedLockMap.get(lockKey);
+ return lockEntry != null && lockOwner.equals(lockEntry.getLockOwner());
+ }
+
+ @Override
+ public boolean releaseLock(String lockKey) {
+ final String lockOwner = getLockOwnerPrefix();
+ final RaftLockEntry lockEntry = distributedLockMap.get(lockKey);
+ if (lockEntry == null || !lockOwner.equals(lockEntry.getLockOwner())) {
+ return false;
+ }
+
+ final DistributedLock distributedLock = distributedLockMap.get(lockKey).getDistributedLock();
+ if (distributedLock != null) {
+ distributedLock.unlock();
+ }
+ distributedLockMap.remove(lockKey);
+ return true;
+ }
+
+ public static String getLockOwnerPrefix() {
+ return LOCK_OWNER_PREFIX + Thread.currentThread().getName();
+ }
+
+ @Override
+ public void close() throws Exception {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(WATCH_DOG);
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java
new file mode 100644
index 000000000000..ff56b231aa65
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/manage/RaftSubscribeDataManager.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.manage;
+
+import static com.alipay.sofa.jraft.util.BytesUtil.readUtf8;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeItem;
+import org.apache.dolphinscheduler.plugin.registry.raft.model.NodeType;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.storage.KVEntry;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+@Slf4j
+public class RaftSubscribeDataManager implements IRaftSubscribeDataManager {
+
+ private final Map> dataSubScribeMap = new ConcurrentHashMap<>();
+ private final RheaKVStore kvStore;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private static final Duration LISTENER_CHECK_INTERVAL = Duration.ofSeconds(2);
+ private static final Duration HEART_BEAT_TIME_OUT = Duration.ofSeconds(20);
+ private static final int MAX_RANDOM_DELAY_MS = 500;
+ private static final int SUBSCRIBE_LISTENER_THREAD_POOL_SIZE = 1;
+
+ public RaftSubscribeDataManager(RheaKVStore kvStore) {
+ this.kvStore = kvStore;
+ this.scheduledExecutorService = Executors.newScheduledThreadPool(
+ SUBSCRIBE_LISTENER_THREAD_POOL_SIZE,
+ new ThreadFactoryBuilder().setNameFormat("SubscribeListenerCheckThread").setDaemon(true).build());
+ }
+
+ @Override
+ public void start() {
+ scheduledExecutorService.scheduleWithFixedDelay(new SubscribeCheckTask(),
+ getRandomizedDelay(LISTENER_CHECK_INTERVAL.toMillis()),
+ getRandomizedDelay(LISTENER_CHECK_INTERVAL.toMillis()),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private long getRandomizedDelay(long baseDelay) {
+ // Add a random value in the range [0, MAX_RANDOM_DELAY_MS]
+ Random random = new Random();
+ long randomOffset = random.nextInt(MAX_RANDOM_DELAY_MS + 1);
+ return baseDelay + randomOffset;
+ }
+
+ @Override
+ public void addDataSubscribeListener(String path, SubscribeListener listener) {
+ final List subscribeListeners =
+ dataSubScribeMap.computeIfAbsent(path, k -> new ArrayList<>());
+ subscribeListeners.add(listener);
+ }
+
+ private class SubscribeCheckTask implements Runnable {
+
+ private final Map oldDataMap = new ConcurrentHashMap<>();
+
+ @Override
+ public void run() {
+ try {
+ final Map newDataMap = getNodeDataMap();
+ if (dataSubScribeMap.isEmpty() || newDataMap == null || newDataMap.isEmpty()) {
+ return;
+ }
+ // find the different
+ final Map addedData = new HashMap<>();
+ final Map deletedData = new HashMap<>();
+ final Map updatedData = new HashMap<>();
+
+ Iterator> iterator = newDataMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ final NodeItem oldData = oldDataMap.get(entry.getKey());
+ if (oldData == null) {
+ addedData.put(entry.getKey(), entry.getValue().getNodeValue());
+ } else if (NodeType.EPHEMERAL.getName().equals(entry.getValue().getNodeType())
+ && isUnHealthy(entry.getValue().getNodeValue())) {
+ kvStore.bDelete(entry.getKey());
+ iterator.remove();
+ } else if (!oldData.getNodeValue().equals(entry.getValue().getNodeValue())) {
+ updatedData.put(entry.getKey(), entry.getValue().getNodeValue());
+ }
+ }
+ for (Map.Entry entry : oldDataMap.entrySet()) {
+ if (!newDataMap.containsKey(entry.getKey())) {
+ deletedData.put(entry.getKey(), entry.getValue().getNodeValue());
+ }
+ }
+ oldDataMap.clear();
+ oldDataMap.putAll(newDataMap);
+ // trigger listener
+ triggerListener(addedData, deletedData, updatedData);
+ } catch (Exception ex) {
+ log.error("Error in SubscribeCheckTask run method", ex);
+ }
+ }
+
+ private boolean isUnHealthy(String heartBeat) {
+ try {
+ // consider this not a valid heartbeat instance, do not check
+ if (heartBeat == null || !heartBeat.contains("reportTime")) {
+ return false;
+ }
+ BaseHeartBeat baseHeartBeat = JSONUtils.parseObject(heartBeat, BaseHeartBeat.class);
+ if (baseHeartBeat != null) {
+ return System.currentTimeMillis() - baseHeartBeat.getReportTime() > HEART_BEAT_TIME_OUT.toMillis();
+ }
+ } catch (Exception ex) {
+ log.error("Fail to parse heartBeat : {}", heartBeat, ex);
+ }
+ return false;
+ }
+
+ private void triggerListener(Map addedData, Map deletedData,
+ Map updatedData) {
+ for (Map.Entry> entry : dataSubScribeMap.entrySet()) {
+ String subscribeKey = entry.getKey();
+ final List subscribeListeners = entry.getValue();
+ if (!addedData.isEmpty()) {
+ triggerListener(addedData, subscribeKey, subscribeListeners, Event.Type.ADD);
+ }
+ if (!updatedData.isEmpty()) {
+ triggerListener(updatedData, subscribeKey, subscribeListeners, Event.Type.UPDATE);
+ }
+ if (!deletedData.isEmpty()) {
+ triggerListener(deletedData, subscribeKey, subscribeListeners, Event.Type.REMOVE);
+ }
+ }
+ }
+
+ private Map getNodeDataMap() {
+ try {
+ final Map nodeItemMap = new HashMap<>();
+ final List entryList = kvStore.bScan(RegistryNodeType.ALL_SERVERS.getRegistryPath(),
+ RegistryNodeType.ALL_SERVERS.getRegistryPath() + Constants.SINGLE_SLASH
+ + Constants.RAFT_END_KEY);
+
+ for (KVEntry kvEntry : entryList) {
+ final String entryKey = readUtf8(kvEntry.getKey());
+ final String compositeValue = readUtf8(kvEntry.getValue());
+
+ if (StringUtils.isEmpty(compositeValue)
+ || !entryKey.startsWith(RegistryNodeType.ALL_SERVERS.getRegistryPath())) {
+ continue;
+ }
+
+ String[] nodeTypeAndValue = compositeValue.split(Constants.AT_SIGN);
+ if (nodeTypeAndValue.length < 2) {
+ continue;
+ }
+ String nodeType = nodeTypeAndValue[0];
+ String nodeValue = nodeTypeAndValue[1];
+
+ nodeItemMap.put(entryKey, NodeItem.builder().nodeValue(nodeValue).nodeType(nodeType).build());
+ }
+ return nodeItemMap;
+ } catch (Exception ex) {
+ log.error("Fail to getNodeDataMap", ex);
+ return null;
+ }
+ }
+
+ private void triggerListener(Map nodeDataMap, String subscribeKey,
+ List subscribeListeners, Event.Type type) {
+ for (Map.Entry entry : nodeDataMap.entrySet()) {
+ final String key = entry.getKey();
+ if (key.startsWith(subscribeKey)) {
+ subscribeListeners
+ .forEach(listener -> listener.notify(new Event(key, key, entry.getValue(), type)));
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ log.info("closing raft subscribe data manager");
+ dataSubScribeMap.clear();
+ scheduledExecutorService.shutdown();
+ log.info("raft subscribe data manager closed successfully");
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java
new file mode 100644
index 000000000000..bc0ecf4eba3a
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeItem.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class NodeItem {
+
+ private String nodeType;
+ private String nodeValue;
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java
new file mode 100644
index 000000000000..89074579d36f
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/NodeType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@AllArgsConstructor
+@Getter
+public enum NodeType {
+
+ EPHEMERAL("ephemeralNode"),
+ PERSISTENT("persistentNode");
+ private final String name;
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/RaftLockEntry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/RaftLockEntry.java
new file mode 100644
index 000000000000..3d22be56c270
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/model/RaftLockEntry.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.registry.raft.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import com.alipay.sofa.jraft.rhea.util.concurrent.DistributedLock;
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class RaftLockEntry {
+
+ private DistributedLock distributedLock;
+ private String lockOwner;
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java
new file mode 100644
index 000000000000..23c12c6aa008
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/java/org/apache/dolphinscheduler/plugin/registry/raft/server/RaftRegistryServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.registry.raft.server;
+
+import org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryProperties;
+
+import lombok.extern.slf4j.Slf4j;
+
+import com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore;
+import com.alipay.sofa.jraft.rhea.client.RheaKVStore;
+import com.alipay.sofa.jraft.rhea.options.PlacementDriverOptions;
+import com.alipay.sofa.jraft.rhea.options.RheaKVStoreOptions;
+import com.alipay.sofa.jraft.rhea.options.StoreEngineOptions;
+import com.alipay.sofa.jraft.rhea.options.configured.PlacementDriverOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.RheaKVStoreOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.options.configured.StoreEngineOptionsConfigured;
+import com.alipay.sofa.jraft.rhea.storage.StorageType;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+@Slf4j
+public class RaftRegistryServer {
+
+ private final RheaKVStore rheaKVStore;
+ private final RheaKVStoreOptions options;
+ private volatile boolean started;
+ public RaftRegistryServer(RaftRegistryProperties raftRegistryProperties) {
+ final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
+ .withFake(true) // use a fake pd
+ .config();
+ final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured()
+ .withStorageType(StorageType.Memory)
+ .withRaftDataPath(raftRegistryProperties.getLogStorageDir())
+ .withServerAddress(
+ new Endpoint(raftRegistryProperties.getServerAddress(), raftRegistryProperties.getServerPort()))
+ .config();
+ this.options = RheaKVStoreOptionsConfigured.newConfigured()
+ .withClusterName(raftRegistryProperties.getClusterName())
+ .withInitialServerList(raftRegistryProperties.getServerAddressList())
+ .withStoreEngineOptions(storeOpts)
+ .withPlacementDriverOptions(pdOpts)
+ .config();
+ this.rheaKVStore = new DefaultRheaKVStore();
+ }
+
+ public void start() {
+ if (this.started) {
+ log.info("raft registry server has already started");
+ return;
+ }
+ log.info("starting raft registry server...");
+ this.rheaKVStore.init(this.options);
+ this.started = true;
+ log.info("raft registry server started successfully");
+ }
+
+ public void stop() {
+ log.info("stopping raft registry server");
+ if (this.rheaKVStore != null) {
+ this.rheaKVStore.shutdown();
+ }
+ this.started = false;
+ log.info("raft registry server stopped successfully");
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/resources/META-INF/spring.factories b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000000..cd245597ae71
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ org.apache.dolphinscheduler.plugin.registry.raft.RaftRegistryAutoConfiguration
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java
new file mode 100644
index 000000000000..5ccd2311e224
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/java/org/apache/dolphinscheduler/plugin/registry/raft/RaftRegistryTestCase.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.plugin.registry.raft;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.dolphinscheduler.common.model.BaseHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
+import org.apache.dolphinscheduler.registry.api.Event;
+import org.apache.dolphinscheduler.registry.api.RegistryException;
+import org.apache.dolphinscheduler.registry.api.SubscribeListener;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import lombok.SneakyThrows;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import com.google.common.truth.Truth;
+
+@SpringBootTest(classes = RaftRegistryProperties.class)
+@SpringBootApplication(scanBasePackageClasses = RaftRegistryProperties.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class RaftRegistryTestCase {
+
+ @Autowired
+ private RaftRegistry registry;
+
+ @Test
+ public void testPut() {
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+
+ // Update the value
+ registry.put(key, "123", true);
+ Truth.assertThat(registry.get(key)).isEqualTo("123");
+ }
+
+ @Test
+ public void testGet() {
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ assertThrows(RegistryException.class, () -> registry.get(key));
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+ }
+
+ @Test
+ public void testIsConnected() throws InterruptedException {
+ Truth.assertThat(registry.isConnected()).isTrue();
+ }
+
+ @Test
+ public void testConnectUntilTimeout() {
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> registry.connectUntilTimeout(Duration.ofSeconds(3)));
+ }
+
+ @SneakyThrows
+ @Test
+ public void testEphemeralNodeSubscribe() {
+ final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
+ final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
+ final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
+
+ SubscribeListener subscribeListener = event -> {
+ System.out.println("Receive event: " + event);
+ if (event.type() == Event.Type.ADD) {
+ subscribeAdded.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.REMOVE) {
+ subscribeRemoved.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.UPDATE) {
+ subscribeUpdated.compareAndSet(false, true);
+ }
+ };
+
+ String key = "/nodes/master/";
+ BaseHeartBeat baseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis())
+ .host("127.0.0.1:8081")
+ .build();
+
+ registry.subscribe(key, subscribeListener);
+ registry.put(key, JSONUtils.toJsonString(baseHeartBeat), true);
+
+ Thread.sleep(3000);
+ BaseHeartBeat newBaseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis())
+ .host("127.0.0.1:8081")
+ .build();
+ registry.put(key, JSONUtils.toJsonString(newBaseHeartBeat), true);
+ Thread.sleep(20000);
+
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> {
+ Assertions.assertTrue(subscribeAdded.get());
+ Assertions.assertTrue(subscribeUpdated.get());
+ Assertions.assertTrue(subscribeRemoved.get());
+ });
+
+ // verify that the temporary node data has been removed
+ try {
+ String currentData = registry.get(key);
+ } catch (RegistryException ex) {
+ Assertions.assertEquals("key does not exist:" + key, ex.getMessage(),
+ "Unexpected registry exception message");
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ public void testPersistentNodeSubscribe() {
+ final AtomicBoolean subscribeAdded = new AtomicBoolean(false);
+ final AtomicBoolean subscribeRemoved = new AtomicBoolean(false);
+ final AtomicBoolean subscribeUpdated = new AtomicBoolean(false);
+
+ SubscribeListener subscribeListener = event -> {
+ System.out.println("Receive event: " + event);
+ if (event.type() == Event.Type.ADD) {
+ subscribeAdded.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.REMOVE) {
+ subscribeRemoved.compareAndSet(false, true);
+ }
+ if (event.type() == Event.Type.UPDATE) {
+ subscribeUpdated.compareAndSet(false, true);
+ }
+ };
+
+ String key = "/nodes/master/";
+ BaseHeartBeat baseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis())
+ .host("127.0.0.1:8081")
+ .build();
+
+ registry.subscribe(key, subscribeListener);
+ registry.put(key, JSONUtils.toJsonString(baseHeartBeat), false);
+
+ Thread.sleep(3000);
+ BaseHeartBeat newBaseHeartBeat = BaseHeartBeat.builder().reportTime(System.currentTimeMillis())
+ .host("127.0.0.1:8081")
+ .build();
+ registry.put(key, JSONUtils.toJsonString(newBaseHeartBeat), false);
+ Thread.sleep(20000);
+
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> {
+ Assertions.assertTrue(subscribeAdded.get());
+ Assertions.assertTrue(subscribeUpdated.get());
+ Assertions.assertFalse(subscribeRemoved.get());
+ });
+
+ String currentData = registry.get(key);
+ Assertions.assertNotNull(currentData, "Node data was unexpectedly removed");
+ Assertions.assertEquals(JSONUtils.toJsonString(newBaseHeartBeat), currentData,
+ "Node data does not match the expected value");
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAddConnectionStateListener() {
+ AtomicReference connectionState = new AtomicReference<>();
+ registry.addConnectionStateListener(connectionState::set);
+ Truth.assertThat(connectionState.get()).isNull();
+ // after initialization, the connection status will change to connected
+ // await().atMost(Duration.ofSeconds(2))
+ // .until(() -> ConnectionState.CONNECTED == connectionState.get());
+ }
+
+ @Test
+ public void testDelete() {
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "127.0.0.1:8080";
+ // Delete a non-existent key
+ registry.delete(key);
+
+ registry.put(key, value, true);
+ Truth.assertThat(registry.get(key)).isEqualTo(value);
+ registry.delete(key);
+ Truth.assertThat(registry.exists(key)).isFalse();
+ }
+
+ @Test
+ public void testChildren() {
+ String master1 = "/nodes/master/127.0.0.1:8080";
+ String master2 = "/nodes/master/127.0.0.2:8080";
+ String value = "123";
+ registry.put(master1, value, true);
+ registry.put(master2, value, true);
+ System.out.println(registry.children("/nodes/master"));
+ Truth.assertThat(registry.children("/nodes/master"))
+ .containsAtLeastElementsIn(Lists.newArrayList("127.0.0.1:8080", "127.0.0.2:8080"));
+ }
+
+ @Test
+ public void testExists() {
+ String key = "/nodes/master" + System.nanoTime();
+ String value = "123";
+ Truth.assertThat(registry.exists(key)).isFalse();
+ registry.put(key, value, true);
+ Truth.assertThat(registry.exists(key)).isTrue();
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAcquireLock() {
+ String lockKey = "/lock" + System.nanoTime();
+
+ // 1. Acquire the lock at the main thread
+ Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+ // Acquire the lock at the main thread again
+ // It should acquire success
+ Truth.assertThat(registry.acquireLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey));
+ assertThrows(TimeoutException.class, () -> acquireResult.get(3000, TimeUnit.MILLISECONDS));
+ }
+
+ @SneakyThrows
+ @Test
+ public void testAcquireLock_withTimeout() {
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock in the main thread
+ // It should acquire success
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult =
+ CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isFalse();
+ }
+
+ @SneakyThrows
+ @Test
+ public void testReleaseLock() {
+ String lockKey = "/lock" + System.nanoTime();
+ // 1. Acquire the lock in the main thread
+ Truth.assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire failed
+ CompletableFuture acquireResult =
+ CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isFalse();
+
+ // 2. Release the lock in the main thread
+ Truth.assertThat(registry.releaseLock(lockKey)).isTrue();
+
+ // Acquire the lock at another thread
+ // It should acquire success
+ acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
+ Truth.assertThat(acquireResult.get()).isTrue();
+ }
+}
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/resources/application.yaml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/resources/application.yaml
new file mode 100644
index 000000000000..a0a63940e23b
--- /dev/null
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-raft/src/test/resources/application.yaml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+registry:
+ type: raft
+ clusterName: dolphinscheduler
+ serverAddressList: 127.0.0.1:8082
+ serverAddress: 127.0.0.1
+ serverPort: 8082
+ logStorageDir: raft-data/
+
+
diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
index aa184104d63d..d265a2bc0ad8 100644
--- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
+++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/pom.xml
@@ -33,5 +33,6 @@
dolphinscheduler-registry-jdbc
dolphinscheduler-registry-etcd
dolphinscheduler-registry-it
+ dolphinscheduler-registry-raft
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index e9d3e529ffa7..bde486329fca 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -527,4 +527,17 @@ tea-1.2.7.jar
tea-openapi-0.3.2.jar
tea-util-0.2.21.jar
tea-xml-0.1.5.jar
-
+affinity-3.1.7.jar
+annotations-12.0.jar
+bolt-1.6.4.jar
+disruptor-3.3.7.jar
+hessian-3.3.6.jar
+jctools-core-2.1.1.jar
+jraft-core-1.3.14.jar
+jraft-rheakv-core-1.3.14.jar
+protostuff-api-1.7.2.jar
+protostuff-collectionschema-1.7.2.jar
+protostuff-core-1.7.2.jar
+protostuff-runtime-1.7.2.jar
+rocksdbjni-8.8.1.jar
+sofa-common-tools-1.0.12.jar
\ No newline at end of file