From f823b6a1cbb0413ab5cc5941e98266803c98b1bb Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Thu, 23 Jan 2025 17:00:39 +0800 Subject: [PATCH] [Improvement-16982][Master] When master startup, initialize the cluster from registry --- .../server/master/cluster/ClusterManager.java | 45 ++++++++++++++- .../cluster/IMasterSlotChangeListener.java | 25 +++++++++ .../master/cluster/MasterServerMetadata.java | 3 + .../MasterSlotChangeListenerAdaptor.java | 56 +++++++++++++++++++ .../master/cluster/MasterSlotManager.java | 22 +------- .../master/cluster/MasterSlotManagerTest.java | 19 ++++--- 6 files changed, 139 insertions(+), 31 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java index 3f875f7047e2..3c6f9f1ba0a5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java @@ -17,6 +17,9 @@ package org.apache.dolphinscheduler.server.master.cluster; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; +import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; @@ -36,6 +39,9 @@ public class ClusterManager { @Getter private WorkerClusters workerClusters; + @Autowired + private MasterSlotManager masterSlotManager; + @Autowired private WorkerGroupChangeNotifier workerGroupChangeNotifier; @@ -48,11 +54,48 @@ public ClusterManager() { } public void start() { + initializeMasterClusters(); + initializeWorkerClusters(); + log.info("ClusterManager started..."); + } + + /** + * Initialize the master clusters. + *

1. Register master slot listener once master clusters changed. + *

2. Fetch master nodes from registry. + *

3. Subscribe the master change event. + */ + private void initializeMasterClusters() { + this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters)); + + registryClient.getServerList(RegistryNodeType.MASTER).forEach(server -> { + final MasterHeartBeat masterHeartBeat = + JSONUtils.parseObject(server.getHeartBeatInfo(), MasterHeartBeat.class); + masterClusters.onServerAdded(MasterServerMetadata.parseFromHeartBeat(masterHeartBeat)); + }); + log.info("Initialized MasterClusters: {}", JSONUtils.toPrettyJsonString(masterClusters.getServers())); + this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters); + } + + /** + * Initialize the worker clusters. + *

1. Fetch worker nodes from registry. + *

2. Register worker group change notifier once worker clusters changed. + *

3. Subscribe the worker change event. + */ + private void initializeWorkerClusters() { + registryClient.getServerList(RegistryNodeType.WORKER).forEach(server -> { + final WorkerHeartBeat workerHeartBeat = + JSONUtils.parseObject(server.getHeartBeatInfo(), WorkerHeartBeat.class); + workerClusters.onServerAdded(WorkerServerMetadata.parseFromHeartBeat(workerHeartBeat)); + }); + log.info("Initialized WorkerClusters: {}", JSONUtils.toPrettyJsonString(workerClusters.getServers())); + this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters); + this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters); this.workerGroupChangeNotifier.start(); - log.info("ClusterManager started..."); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java new file mode 100644 index 000000000000..8918fa794d45 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/IMasterSlotChangeListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cluster; + +import java.util.List; + +public interface IMasterSlotChangeListener { + + void onMasterSlotChanged(final List normalMasterServers); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java index f68e13d7e9f6..3ec7c35efd82 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterServerMetadata.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.cluster; +import static com.google.common.base.Preconditions.checkNotNull; + import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.model.MasterHeartBeat; @@ -32,6 +34,7 @@ public class MasterServerMetadata extends BaseServerMetadata implements Comparable { public static MasterServerMetadata parseFromHeartBeat(final MasterHeartBeat masterHeartBeat) { + checkNotNull(masterHeartBeat); return MasterServerMetadata.builder() .processId(masterHeartBeat.getProcessId()) .serverStartupTime(masterHeartBeat.getStartupTime()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java new file mode 100644 index 000000000000..be280f1ed52c --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotChangeListenerAdaptor.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.cluster; + +import java.util.List; + +public class MasterSlotChangeListenerAdaptor + implements + IMasterSlotChangeListener, + IClusters.IClustersChangeListener { + + private final MasterSlotManager masterSlotManager; + + private final MasterClusters masterClusters; + + public MasterSlotChangeListenerAdaptor(final MasterSlotManager masterSlotManager, + final MasterClusters masterClusters) { + this.masterSlotManager = masterSlotManager; + this.masterClusters = masterClusters; + } + + @Override + public void onMasterSlotChanged(final List normalMasterServers) { + masterSlotManager.doReBalance(normalMasterServers); + } + + @Override + public void onServerAdded(MasterServerMetadata server) { + onMasterSlotChanged(masterClusters.getNormalServers()); + } + + @Override + public void onServerRemove(MasterServerMetadata server) { + onMasterSlotChanged(masterClusters.getNormalServers()); + } + + @Override + public void onServerUpdate(MasterServerMetadata server) { + onMasterSlotChanged(masterClusters.getNormalServers()); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java index 4e619b2fcc81..67660bd038a9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManager.java @@ -29,34 +29,14 @@ @Component public class MasterSlotManager implements IMasterSlotReBalancer { - private final MasterClusters masterClusters; - private final MasterConfig masterConfig; private volatile int currentSlot = -1; private volatile int totalSlots = 0; - public MasterSlotManager(ClusterManager clusterManager, MasterConfig masterConfig) { + public MasterSlotManager(final MasterConfig masterConfig) { this.masterConfig = masterConfig; - this.masterClusters = clusterManager.getMasterClusters(); - this.masterClusters.registerListener(new IClusters.IClustersChangeListener() { - - @Override - public void onServerAdded(MasterServerMetadata server) { - doReBalance(masterClusters.getNormalServers()); - } - - @Override - public void onServerRemove(MasterServerMetadata server) { - doReBalance(masterClusters.getNormalServers()); - } - - @Override - public void onServerUpdate(MasterServerMetadata server) { - doReBalance(masterClusters.getNormalServers()); - } - }); } /** diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java index 56d119d776e8..0b4f659ebc0d 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/MasterSlotManagerTest.java @@ -29,16 +29,16 @@ class MasterSlotManagerTest { private MasterSlotManager masterSlotManager; - private ClusterManager clusterManager; + private MasterClusters masterClusters; private MasterConfig masterConfig; @BeforeEach public void setUp() { - clusterManager = new ClusterManager(); + masterClusters = new MasterClusters(); masterConfig = new MasterConfig(); masterConfig.setMasterAddress("127.0.0.1:5678"); - masterSlotManager = new MasterSlotManager(clusterManager, masterConfig); + masterSlotManager = new MasterSlotManager(masterConfig); MasterServerMetadata master1 = MasterServerMetadata.builder() .cpuUsage(0.2) .memoryUsage(0.4) @@ -63,10 +63,11 @@ public void setUp() { .serverStatus(ServerStatus.BUSY) .address("127.0.0.4:5679") .build(); - clusterManager.getMasterClusters().onServerAdded(master1); - clusterManager.getMasterClusters().onServerAdded(master2); - clusterManager.getMasterClusters().onServerAdded(master3); - clusterManager.getMasterClusters().onServerAdded(master4); + this.masterClusters.registerListener(new MasterSlotChangeListenerAdaptor(masterSlotManager, masterClusters)); + masterClusters.onServerAdded(master1); + masterClusters.onServerAdded(master2); + masterClusters.onServerAdded(master3); + masterClusters.onServerAdded(master4); } @Test @@ -98,8 +99,8 @@ void doReBalance() { .serverStatus(ServerStatus.BUSY) .address("127.0.0.4:5679") .build(); - clusterManager.getMasterClusters().onServerRemove(master2); - clusterManager.getMasterClusters().onServerRemove(master3); + masterClusters.onServerRemove(master2); + masterClusters.onServerRemove(master3); // After doReBalance, the total master slots should be 2 assertThat(masterSlotManager.getTotalMasterSlots()).isEqualTo(2); }