Skip to content

Commit

Permalink
fix(backend): 同步容量状态 TencentBlueKing#5861
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke authored and zhangzhw8 committed Jul 29, 2024
1 parent 39d731f commit 9272d9f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 58 deletions.
4 changes: 2 additions & 2 deletions dbm-ui/backend/db_meta/models/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,10 @@ def tendbcluster_ctl_primary_address(self) -> str:
return ctl_address

@classmethod
def get_cluster_stats(cls, cluster_types) -> dict:
def get_cluster_stats(cls, bk_biz_id, cluster_types) -> dict:
cluster_stats = {}
for cluster_type in cluster_types:
cluster_stats.update(json.loads(cache.get(f"{CACHE_CLUSTER_STATS}_{cluster_type}", "{}")))
cluster_stats.update(json.loads(cache.get(f"{CACHE_CLUSTER_STATS}_{bk_biz_id}_{cluster_type}", "{}")))

return cluster_stats

Expand Down
65 changes: 41 additions & 24 deletions dbm-ui/backend/db_periodic_task/local_tasks/db_meta/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"end_time": 1697101305,
"slimit": 500,
"down_sample_range": "1s",
# 取最新的几个周期,可以加速查询
# 取最新的几个周期,可以加速查询(如果指标数据不连续,则查不出数据)
"type": "instant",
}

Expand All @@ -38,7 +38,7 @@
"range": 5,
"used": """sum by (cluster_domain) (
sum_over_time(
bkmonitor:exporter_dbm_redis_exporter:redis_memory_used_bytes{ instance_role="redis_master"}[1m]
bkmonitor:exporter_dbm_redis_exporter:redis_memory_used_bytes{instance_role="redis_master,%s"}[1m]
))""",
"total": """sum by (cluster_domain) (
avg by (cluster_domain, bk_target_ip) (
Expand All @@ -51,50 +51,50 @@
"range": 5,
"used": """sum by (cluster_domain) (max by (cluster_domain,ip,mount_point) (
max_over_time(
bkmonitor:exporter_dbm_redis_exporter:redis_datadir_df_used_mb{instance_role="redis_master"}[1m]
bkmonitor:exporter_dbm_redis_exporter:redis_datadir_df_used_mb{instance_role="redis_master",%s}[1m]
) * 1024 * 1024))""",
"total": """sum by (cluster_domain) (max by (cluster_domain,ip,mount_point) (
max_over_time(
bkmonitor:exporter_dbm_redis_exporter:redis_datadir_df_total_mb{instance_role="redis_master"}[1m]
bkmonitor:exporter_dbm_redis_exporter:redis_datadir_df_total_mb{instance_role="redis_master",%s}[1m]
) * 1024 * 1024))""",
},
ClusterType.TenDBSingle: {
"range": 15,
"range": 120,
"used": """sum by (cluster_domain) (
max_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="orphan"}[5m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="orphan",%s}[5m]
) * 1024 * 1024 )""",
"total": """max by (cluster_domain) (
max_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="orphan"}[5m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="orphan",%s}[5m]
) * 1024 * 1024 )""",
},
ClusterType.TenDBHA: {
"range": 15,
"range": 120,
"used": """sum by (cluster_domain) (
max by (cluster_domain, ip) (
max_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="backend_master"}[5m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="backend_master",%s}[5m]
) * 1024 * 1024
))""",
"total": """sum by (cluster_domain) (
max by (cluster_domain, ip) (
max_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="backend_master"}[5m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="backend_master",%s}[5m]
) * 1024 * 1024
))""",
},
ClusterType.TenDBCluster: {
"range": 5,
"range": 120,
"used": """sum by (cluster_domain) (
avg by (cluster_domain, ip) (
avg_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="remote_master"}[1m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_used_mb{instance_role="remote_master",%s}[1m]
) * 1024))""",
"total": """sum by (cluster_domain) (
avg by (cluster_domain, ip) (
avg_over_time(
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="remote_master"}[1m]
bkmonitor:exporter_dbm_mysqld_exporter:mysql_datadir_df_total_mb{instance_role="remote_master",%s}[1m]
) * 1024))""",
},
# es采集器本身存在容量统计指标(elasticsearch_filesystem_data_size_bytes、elasticsearch_indices_store_size_bytes)
Expand All @@ -106,15 +106,15 @@
bkmonitor:dbm_system:disk:used{
device_type=~"ext.?|xfs",
instance_role=~"^(es_datanode_hot|es_datanode_cold)$",
mount_point!~"^(/|/usr/local)$"
mount_point!~"^(/|/usr/local)$",%s
}[1m]
))""",
"total": """sum by (cluster_domain) (
sum_over_time(
bkmonitor:dbm_system:disk:total{
device_type=~"ext.?|xfs",
instance_role=~"^(es_datanode_hot|es_datanode_cold)$",
mount_point!~"^(/|/usr/local)$"
mount_point!~"^(/|/usr/local)$",%s
}[1m]
))""",
},
Expand All @@ -123,15 +123,15 @@
"used": """sum by (cluster_domain) (
sum_over_time(
bkmonitor:dbm_system:disk:used{
device_type=~"ext.?|xfs",instance_role="broker",mount_point!~"^(/|/usr/local)$"
device_type=~"ext.?|xfs",instance_role="broker",mount_point!~"^(/|/usr/local),%s$"
}[1m]
))""",
"total": """sum by (cluster_domain) (
sum_over_time(
bkmonitor:dbm_system:disk:total{
device_type=~"ext.?|xfs",
instance_role="broker",
mount_point!~"^(/|/usr/local)$"
mount_point!~"^(/|/usr/local)$",%s
}[1m]
))""",
},
Expand All @@ -142,36 +142,53 @@
bkmonitor:dbm_system:disk:used{
device_type=~"ext.?|xfs",
instance_role="pulsar_bookkeeper",
mount_point!~"^(/|/usr/local)$"
mount_point!~"^(/|/usr/local)$",%s
}[1m]
))""",
"total": """sum by (cluster_domain) (
sum_over_time(
bkmonitor:dbm_system:disk:total{
device_type=~"ext.?|xfs",
instance_role="pulsar_bookkeeper",
mount_point!~"^(/|/usr/local)$"
mount_point!~"^(/|/usr/local)$",%s
}[1m]
))""",
},
ClusterType.Hdfs: {
"range": 5,
"used": """avg by (cluster_domain) (
avg_over_time(bkmonitor:exporter_dbm_hdfs_exporter:hadoop_namenode_capacity_used[1m]))""",
avg_over_time(bkmonitor:exporter_dbm_hdfs_exporter:hadoop_namenode_capacity_used{%s}[1m]))""",
"total": """avg by (cluster_domain) (
avg_over_time(bkmonitor:exporter_dbm_hdfs_exporter:hadoop_namenode_capacity_total[1m]))""",
avg_over_time(bkmonitor:exporter_dbm_hdfs_exporter:hadoop_namenode_capacity_total{%s}[1m]))""",
},
ClusterType.Influxdb: {
"range": 5,
"used": """max by (instance_host) (
max_over_time(bkmonitor:pushgateway_dbm_influxdb_bkpull:disk_used{path=~"^/data|/data1$"}[1m]))""",
max_over_time(bkmonitor:pushgateway_dbm_influxdb_bkpull:disk_used{path=~"^/data|/data1$",%s}[1m]))""",
"total": """max by (instance_host) (
max_over_time(bkmonitor:pushgateway_dbm_influxdb_bkpull:disk_total{path=~"^/data|/data1$"}[1m]))""",
max_over_time(bkmonitor:pushgateway_dbm_influxdb_bkpull:disk_total{path=~"^/data|/data1$",%s}[1m]))""",
},
ClusterType.Dbmon: {
"range": 5,
"heartbeat": """
avg by (target,bk_biz_id,app,bk_cloud_id, cluster_domain, cluster_type, instance_role)
(avg_over_time(custom:dbm_report_channel:redis_dbmon_heart_beat{{cluster_domain="{cluster_domain}"}}[1m]))""",
(avg_over_time(custom:dbm_report_channel:redis_dbmon_heart_beat{
{cluster_domain="{cluster_domain}",%s}
}[1m]))""",
},
}

# 使用相同查询模板的集群类型映射
SAME_QUERY_TEMPLATE_CLUSTER_TYPE_MAP = {
# Redis 内存型
ClusterType.TendisPredixyRedisCluster.value: ClusterType.TendisTwemproxyRedisInstance.value,
ClusterType.RedisInstance.value: ClusterType.TendisTwemproxyRedisInstance.value,
ClusterType.TendisRedisInstance.value: ClusterType.TendisTwemproxyRedisInstance.value,
ClusterType.TendisRedisCluster.value: ClusterType.TendisTwemproxyRedisInstance.value,
# Redis 磁盘型
ClusterType.TendisPredixyTendisplusCluster.value: ClusterType.TwemproxyTendisSSDInstance.value,
ClusterType.TendisTwemproxyTendisplusIns.value: ClusterType.TwemproxyTendisSSDInstance.value,
ClusterType.TendisTendisSSDInstance.value: ClusterType.TwemproxyTendisSSDInstance.value,
ClusterType.TendisTendisplusInsance.value: ClusterType.TwemproxyTendisSSDInstance.value,
ClusterType.TendisTendisplusCluster.value: ClusterType.TwemproxyTendisSSDInstance.value,
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,23 @@
from backend import env
from backend.components import BKMonitorV3Api
from backend.constants import CACHE_CLUSTER_STATS
from backend.db_meta.enums import ClusterType, InstanceRole
from backend.db_meta.models import Cluster, StorageInstance

from ..register import register_periodic_task
from .constants import QUERY_TEMPLATE, UNIFY_QUERY_PARAMS
from backend.db_meta.enums import ClusterType
from backend.db_meta.models import Cluster
from backend.db_periodic_task.local_tasks import register_periodic_task
from backend.db_periodic_task.local_tasks.db_meta.constants import (
QUERY_TEMPLATE,
SAME_QUERY_TEMPLATE_CLUSTER_TYPE_MAP,
UNIFY_QUERY_PARAMS,
)
from backend.db_periodic_task.utils import TimeUnit, calculate_countdown

logger = logging.getLogger("celery")


def query_cap(cluster_type, cap_key="used"):
def query_cap(bk_biz_id, cluster_type, cap_key="used"):
"""查询某类集群的某种容量: used/total"""

cluster_type = SAME_QUERY_TEMPLATE_CLUSTER_TYPE_MAP.get(cluster_type, cluster_type)
query_template = QUERY_TEMPLATE.get(cluster_type)
if not query_template:
logger.error("No query template for cluster type: %s", cluster_type)
Expand All @@ -44,20 +49,22 @@ def query_cap(cluster_type, cap_key="used"):
start_time = end_time - datetime.timedelta(minutes=query_template["range"])

params = copy.deepcopy(UNIFY_QUERY_PARAMS)

# mysql 的指标不连续,使用 "type": "instant" 会导致查询结果为空
if cluster_type in [ClusterType.TenDBSingle.value, ClusterType.TenDBHA.value, ClusterType.TenDBCluster.value]:
params.pop("type", "")

params["bk_biz_id"] = env.DBA_APP_BK_BIZ_ID
params["start_time"] = int(start_time.timestamp())
params["end_time"] = int(end_time.timestamp())
# 加速查询
params["type"] = "instant"

params["query_configs"][0]["promql"] = query_template[cap_key]
params["query_configs"][0]["promql"] = query_template[cap_key] % f'bk_biz_id="{bk_biz_id}"'
series = BKMonitorV3Api.unify_query(params)["series"]

cluster_bytes = {}
for serie in series:
# 集群:cluster_domain | influxdb: instance_host
cluster_domain = list(serie["dimensions"].values())[0]
# cluster_domain = serie["dimensions"]["cluster_domain"]
datapoints = list(filter(lambda dp: dp[0] is not None, serie["datapoints"]))

if not datapoints:
Expand All @@ -68,25 +75,25 @@ def query_cap(cluster_type, cap_key="used"):
return cluster_bytes


def query_cluster_capacity(cluster_type):
def query_cluster_capacity(bk_biz_id, cluster_type):
"""查询集群容量"""

cluster_cap_bytes = defaultdict(dict)

domains = (
list(Cluster.objects.filter(cluster_type=cluster_type).values_list("immute_domain", flat=True).distinct())
if cluster_type != ClusterType.Influxdb
else StorageInstance.objects.filter(instance_role=InstanceRole.INFLUXDB).values_list("machine__ip", flat=True)
domains = list(
Cluster.objects.filter(bk_biz_id=bk_biz_id, cluster_type=cluster_type)
.values_list("immute_domain", flat=True)
.distinct()
)

used_data = query_cap(cluster_type, "used")
used_data = query_cap(bk_biz_id, cluster_type, "used")
for cluster, used in used_data.items():
# 排除无效集群
if cluster not in domains:
continue
cluster_cap_bytes[cluster]["used"] = used

total_data = query_cap(cluster_type, "total")
total_data = query_cap(bk_biz_id, cluster_type, "total")
for cluster, used in total_data.items():
# 排除无效集群
if cluster not in domains:
Expand All @@ -97,21 +104,17 @@ def query_cluster_capacity(cluster_type):


@current_app.task
def sync_cluster_stat_by_cluster_type(cluster_type):
def sync_cluster_stat_by_cluster_type(bk_biz_id, cluster_type):
"""
按集群类型同步各集群容量状态
"""

logger.info("sync_cluster_stat_from_monitor started")
cluster_types = list(Cluster.objects.values_list("cluster_type", flat=True).distinct())
cluster_types.append(ClusterType.Influxdb.value)

cluster_stats = {}
try:
cluster_capacity = query_cluster_capacity(cluster_type)
cluster_stats.update(cluster_capacity)
cluster_stats = query_cluster_capacity(bk_biz_id, cluster_type)
except Exception as e:
logger.error("query_cluster_capacity error: %s -> %s", cluster_type, e)
return

# 计算使用率
for cluster, cap in cluster_stats.items():
Expand All @@ -120,18 +123,26 @@ def sync_cluster_stat_by_cluster_type(cluster_type):
continue
cap["in_use"] = round(cap["used"] * 100.0 / cap["total"], 2)

cache.set(f"{CACHE_CLUSTER_STATS}_{cluster_type}", json.dumps(cluster_stats))
cache.set(f"{CACHE_CLUSTER_STATS}_{bk_biz_id}_{cluster_type}", json.dumps(cluster_stats))


@register_periodic_task(run_every=crontab(minute="*/10"))
@register_periodic_task(run_every=crontab(hour="*/1"))
def sync_cluster_stat_from_monitor():
"""
同步各集群容量状态
"""

logger.info("sync_cluster_stat_from_monitor started")
cluster_types = list(Cluster.objects.values_list("cluster_type", flat=True).distinct())
cluster_types.append(ClusterType.Influxdb.value)

for cluster_type in cluster_types:
sync_cluster_stat_by_cluster_type.apply_async(args=[cluster_type])
biz_cluster_types = Cluster.objects.values_list("bk_biz_id", "cluster_type").distinct()

count = len(biz_cluster_types)
for index, (bk_biz_id, cluster_type) in enumerate(biz_cluster_types):
countdown = calculate_countdown(count=count, index=index, duration=1 * TimeUnit.HOUR)
logger.info(
"{}_{} sync_cluster_stat_from_monitor will be run after {} seconds.".format(
bk_biz_id, cluster_type, countdown
)
)
sync_cluster_stat_by_cluster_type.apply_async(
kwargs={"bk_biz_id": bk_biz_id, "cluster_type": cluster_type}, countdown=countdown
)
2 changes: 1 addition & 1 deletion dbm-ui/backend/db_services/dbbase/resources/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def _filter_cluster_hook(
cluster_operate_records_map=cluster_operate_records_map,
cloud_info=cloud_info,
biz_info=biz_info,
cluster_stats_map=Cluster.get_cluster_stats(cls.cluster_types),
cluster_stats_map=Cluster.get_cluster_stats(bk_biz_id, cls.cluster_types),
**kwargs,
)
clusters.append(cluster_info)
Expand Down

0 comments on commit 9272d9f

Please sign in to comment.