diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java index 54cb12e077b1..9f34e2923e4f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/ClusterManager.java @@ -51,6 +51,7 @@ public void start() { this.registryClient.subscribe(RegistryNodeType.MASTER.getRegistryPath(), masterClusters); this.registryClient.subscribe(RegistryNodeType.WORKER.getRegistryPath(), workerClusters); this.workerGroupChangeNotifier.subscribeWorkerGroupsChange(workerClusters); + this.workerGroupChangeNotifier.startScheduleThread(); log.info("ClusterManager started..."); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index 8972a21549af..3bc3b588c5c0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -20,11 +20,12 @@ import org.apache.dolphinscheduler.common.utils.MapComparator; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.utils.MasterThreadFactory; import org.apache.commons.collections4.CollectionUtils; -import java.time.Duration; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -35,6 +36,7 @@ import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -44,7 +46,8 @@ @Component public class WorkerGroupChangeNotifier { - private static final long DEFAULT_REFRESH_WORKER_INTERVAL = 10; + @Autowired + private MasterConfig masterConfig; private final WorkerGroupDao workerGroupDao; private final List listeners = new CopyOnWriteArrayList<>(); @@ -53,17 +56,21 @@ public class WorkerGroupChangeNotifier { public WorkerGroupChangeNotifier(WorkerGroupDao workerGroupDao) { this.workerGroupDao = workerGroupDao; + } + + public void startScheduleThread() { detectWorkerGroupChanges(); + final long workerGroupRefreshIntervalSeconds = masterConfig.getWorkerGroupRefreshInterval().getSeconds(); MasterThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( this::detectWorkerGroupChanges, - DEFAULT_REFRESH_WORKER_INTERVAL, - DEFAULT_REFRESH_WORKER_INTERVAL, + workerGroupRefreshIntervalSeconds, + workerGroupRefreshIntervalSeconds, TimeUnit.SECONDS); } public void subscribeWorkerGroupsChange(WorkerGroupListener listener) { - - //add all group when listener added + + // add all group when listener added listener.onWorkerGroupAdd(new ArrayList<>(workerGroupMap.values())); listeners.add(listener);