diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
index 3f875f7047e2..3c6f9f1ba0a5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java
@@ -17,6 +17,9 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
+import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -36,6 +39,9 @@ public class ClusterManager {
@Getter
private WorkerClusters workerClusters;
+ @Autowired
+ private MasterSlotManager masterSlotManager;
+
@Autowired
private WorkerGroupChangeNotifier workerGroupChangeNotifier;
@@ -48,11 +54,48 @@ public ClusterManager() {
}
public void start() {
+ initializeMasterClusters();
+ initializeWorkerClusters();
+ log.info("ClusterManager started...");
+ }
+
+ /**
+ * Initialize the master clusters.
+ *
1. Register master slot listener once master clusters changed.
+ *
2. Fetch master nodes from registry.
+ *
3. Subscribe the master change event.
+ */
+ private void initializeMasterClusters() {
+ this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+
+ registryClient.getServerList(RegistryNodeType.MASTER).forEach(server -> {
+ final MasterHeartBeat masterHeartBeat =
+ JSONUtils.parseObject(server.getHeartBeatInfo(), MasterHeartBeat.class);
+ masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat));
+ });
+ log.info("Initialized MasterClusters: {}", JSONUtils.toPrettyJsonString(masterClusters.getServers()));
+
this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters);
+ }
+
+ /**
+ * Initialize the worker clusters.
+ *
1. Fetch worker nodes from registry.
+ *
2. Register worker group change notifier once worker clusters changed.
+ *
3. Subscribe the worker change event.
+ */
+ private void initializeWorkerClusters() {
+ registryClient.getServerList(RegistryNodeType.WORKER).forEach(server -> {
+ final WorkerHeartBeat workerHeartBeat =
+ JSONUtils.parseObject(server.getHeartBeatInfo(), WorkerHeartBeat.class);
+ workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat));
+ });
+ log.info("Initialized WorkerClusters: {}", JSONUtils.toPrettyJsonString(workerClusters.getServers()));
+
this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters);
+
this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters);
this.workerGroupChangeNotifier.start();
- log.info("ClusterManager started...");
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
new file mode 100644
index 000000000000..8918fa794d45
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public interface IMasterSlotChangeListener {
+
+ void onMasterSlotChanged(final List normalMasterServers);
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
index f68e13d7e9f6..3ec7c35efd82 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.cluster;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
@@ -32,6 +34,7 @@
public class MasterServerMetadata extends BaseServerMetadata implements Comparable {
public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) {
+ checkNotNull(masterHeartBeat);
return MasterServerMetadata.builder()
.processId(masterHeartBeat.getProcessId())
.serverStartupTime(masterHeartBeat.getStartupTime())
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
new file mode 100644
index 000000000000..be280f1ed52c
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.cluster;
+
+import java.util.List;
+
+public class MasterSlotChangeListenerAdaptor
+ implements
+ IMasterSlotChangeListener,
+ IClusters.IClustersChangeListener {
+
+ private final MasterSlotManager masterSlotManager;
+
+ private final MasterClusters masterClusters;
+
+ public MasterSlotChangeListenerAdaptor(final MasterSlotManager masterSlotManager,
+ final MasterClusters masterClusters) {
+ this.masterSlotManager = masterSlotManager;
+ this.masterClusters = masterClusters;
+ }
+
+ @Override
+ public void onMasterSlotChanged(final List normalMasterServers) {
+ masterSlotManager.doReBalance(normalMasterServers);
+ }
+
+ @Override
+ public void onServerAdded(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+
+ @Override
+ public void onServerRemove(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+
+ @Override
+ public void onServerUpdate(MasterServerMetadata server) {
+ onMasterSlotChanged(masterClusters.getNormalServers());
+ }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
index 4e619b2fcc81..67660bd038a9 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java
@@ -29,34 +29,14 @@
@Component
public class MasterSlotManager implements IMasterSlotReBalancer {
- private final MasterClusters masterClusters;
-
private final MasterConfig masterConfig;
private volatile int currentSlot = -1;
private volatile int totalSlots = 0;
- public MasterSlotManager(ClusterManager clusterManager, MasterConfig masterConfig) {
+ public MasterSlotManager(final MasterConfig masterConfig) {
this.masterConfig = masterConfig;
- this.masterClusters = clusterManager.getMasterClusters();
- this.masterClusters.registerListener(new IClusters.IClustersChangeListener() {
-
- @Override
- public void onServerAdded(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
-
- @Override
- public void onServerRemove(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
-
- @Override
- public void onServerUpdate(MasterServerMetadata server) {
- doReBalance(masterClusters.getNormalServers());
- }
- });
}
/**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
index 56d119d776e8..0b4f659ebc0d 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java
@@ -29,16 +29,16 @@ class MasterSlotManagerTest {
private MasterSlotManager masterSlotManager;
- private ClusterManager clusterManager;
+ private MasterClusters masterClusters;
private MasterConfig masterConfig;
@BeforeEach
public void setUp() {
- clusterManager = new ClusterManager();
+ masterClusters = new MasterClusters();
masterConfig = new MasterConfig();
masterConfig.setMasterAddress("127.0.0.1:5678");
- masterSlotManager = new MasterSlotManager(clusterManager, masterConfig);
+ masterSlotManager = new MasterSlotManager(masterConfig);
MasterServerMetadata master1 = MasterServerMetadata.builder()
.cpuUsage(0.2)
.memoryUsage(0.4)
@@ -63,10 +63,11 @@ public void setUp() {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
- clusterManager.getMasterClusters().onServerAdded(master1);
- clusterManager.getMasterClusters().onServerAdded(master2);
- clusterManager.getMasterClusters().onServerAdded(master3);
- clusterManager.getMasterClusters().onServerAdded(master4);
+ this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters));
+ masterClusters.onServerAdded(master1);
+ masterClusters.onServerAdded(master2);
+ masterClusters.onServerAdded(master3);
+ masterClusters.onServerAdded(master4);
}
@Test
@@ -98,8 +99,8 @@ void doReBalance() {
.serverStatus(ServerStatus.BUSY)
.address("127.0.0.4:5679")
.build();
- clusterManager.getMasterClusters().onServerRemove(master2);
- clusterManager.getMasterClusters().onServerRemove(master3);
+ masterClusters.onServerRemove(master2);
+ masterClusters.onServerRemove(master3);
// After doReBalance, the total master slots should be 2
assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2);
}