Skip to content

Commit

Permalink
Use ClusterManager to manage the cluster info
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jul 2, 2024
1 parent 1fe8a6e commit 76d7a34
Show file tree
Hide file tree
Showing 82 changed files with 2,157 additions and 2,454 deletions.
48 changes: 24 additions & 24 deletions docs/docs/en/architecture/configuration.md

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,30 +284,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId

位置:`master-server/conf/application.yaml`

| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.host-selector | lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, lower_weight |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |
| 参数 | 默认值 | 描述 |
|-----------------------------------------------------------------------------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| master.listen-port | 5678 | master监听端口 |
| master.pre-exec-threads | 10 | master准备执行任务的数量,用于限制并行的command |
| master.exec-threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.dispatch-task-number | 3 | master每个批次的派发任务数量 |
| master.worker-load-balancer.type | DYNAMIC_WEIGHTED_ROUND_ROBIN | Master 将会使用Worker的动态CPU/Memory/线程池使用率来计算Worker的负载,负载越低的worker将会有更高的机会被分发任务 |
| master.max-heartbeat-interval | 10s | master最大心跳间隔 |
| master.task-commit-retry-times | 5 | 任务重试次数 |
| master.task-commit-interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.state-wheel-interval | 5 | 轮询检查状态时间 |
| master.server-load-protection.enabled | true | 是否开启系统保护策略 |
| master.server-load-protection.max-system-cpu-usage-percentage-thresholds | 0.7 | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统CPU |
| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds | 0.7 | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
| master.server-load-protection.max-system-memory-usage-percentage-thresholds | 0.7 | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统内存 |
| master.server-load-protection.max-disk-usage-percentage-thresholds | 0.7 | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的操作系统磁盘空间 |
| master.failover-interval | 10 | failover间隔,单位为分钟 |
| master.kill-application-when-task-failover | true | 当任务实例failover时,是否kill掉yarn或k8s application |
| master.registry-disconnect-strategy.strategy | stop | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting |
| master.registry-disconnect-strategy.max-waiting-time | 100s | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 该值表示当Master与注册中心失联时会在给定时间之内进行重连, 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
| master.master.worker-group-refresh-interval | 10s | 定期将workerGroup从数据库中同步到内存的时间间隔 |
| master.command-fetch-strategy.type | ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED` |
| master.command-fetch-strategy.config.id-step | 1 | 数据库中t_ds_command的id自增步长 |
| master.command-fetch-strategy.config.fetch-size | 10 | master拉取command数量 |

## Worker Server相关配置

Expand Down
2 changes: 1 addition & 1 deletion dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector.version}</version>
<scope>test</scope>
<!-- <scope>test</scope>-->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
public class WorkerHeartBeat extends BaseHeartBeat implements HeartBeat {

private int workerHostWeight; // worker host weight
private int threadPoolUsage; // worker waiting task count
private double threadPoolUsage; // worker waiting task count

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.utils;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class MapComparator<K, V> {

private final Map<K, V> oldMap;
private final Map<K, V> newMap;

public MapComparator(Map<K, V> oldMap, Map<K, V> newMap) {
this.oldMap = oldMap;
this.newMap = newMap;
}

public Set<K> getKeysToAdd() {
Set<K> keysToAdd = new HashSet<>(newMap.keySet());
keysToAdd.removeAll(oldMap.keySet());
return keysToAdd;
}

public List<V> getValuesToAdd() {
return getKeysToAdd().stream().map(newMap::get).collect(Collectors.toList());
}

public Set<K> getKeysToRemove() {
Set<K> keysToRemove = new HashSet<>(oldMap.keySet());
keysToRemove.removeAll(newMap.keySet());
return keysToRemove;
}

public List<V> getValuesToRemove() {
return getKeysToRemove().stream().map(oldMap::get).collect(Collectors.toList());
}

public Set<K> getKeysToUpdate() {
Set<K> keysToUpdate = new HashSet<>(newMap.keySet());
keysToUpdate.retainAll(oldMap.keySet());
keysToUpdate.removeIf(key -> newMap.get(key).equals(oldMap.get(key)));

return keysToUpdate;
}

public List<V> getNewValuesToUpdate() {
return getKeysToUpdate().stream().map(newMap::get).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class AlertDao {

private static final Integer QUERY_ALERT_THRESHOLD = 100;

private static final int ADMIN_ALERT_GROUP_ID = 1;

@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;

Expand Down Expand Up @@ -159,11 +161,10 @@ public int insertAlertSendStatus(List<AlertSendStatus> alertSendStatuses) {
/**
* MasterServer or WorkerServer stopped
*
* @param alertGroupId alertGroupId
* @param host host
* @param serverType serverType
*/
public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
public void sendServerStoppedAlert(String host, String serverType) {
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().type(serverType)
.host(host)
.event(AlertEvent.SERVER_DOWN)
Expand All @@ -175,7 +176,7 @@ public void sendServerStoppedAlert(int alertGroupId, String host, String serverT
alert.setWarningType(WarningType.FAILURE);
alert.setAlertStatus(AlertStatus.WAIT_EXECUTION);
alert.setContent(content);
alert.setAlertGroupId(alertGroupId);
alert.setAlertGroupId(ADMIN_ALERT_GROUP_ID);
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

import java.util.Date;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

/**
* worker group
*/
@TableName("t_ds_worker_group")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkerGroup {

@TableId(value = "id", type = IdType.AUTO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.dispatch.enums;
package org.apache.dolphinscheduler.dao.repository;

// todo: refactor this enum
public enum ExecutorType {
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;

WORKER,

CLIENT,
MASTER,
;
public interface WorkerGroupDao extends IDao<WorkerGroup> {
}
Loading

0 comments on commit 76d7a34

Please sign in to comment.