Skip to content

Commit

Permalink
fix(backend): mysql 临时账号巡检 #8648
Browse files Browse the repository at this point in the history
  • Loading branch information
ygcyao committed Dec 19, 2024
1 parent 784c639 commit 85b4a7f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
specific language governing permissions and limitations under the License.
"""
import logging
from collections import defaultdict
from typing import List

from backend.components import DRSApi
from backend.constants import IP_PORT_DIVIDER
Expand All @@ -28,13 +30,15 @@ class CheckExpiredJobUserForMysql(object):
2:在job账号获取到流程flow的root_id,判断root_id对应的flow是否已中断或者注销的状态,如果是,则账号定义为已过期
"""

def __init__(self, mysql_cluster_type: ClusterType):
if mysql_cluster_type not in [ClusterType.TenDBHA, ClusterType.TenDBCluster, ClusterType.TenDBSingle]:
raise Exception(
f"the cluster_type does not belong to the mysql cluster type range: cluster_type:[{mysql_cluster_type}]"
)
self.mysql_cluster_type = mysql_cluster_type
self.clusters = Cluster.objects.filter(cluster_type=mysql_cluster_type)
def __init__(self, mysql_cluster_types: List[ClusterType]):
for mysql_cluster_type in mysql_cluster_types:
if mysql_cluster_type not in [ClusterType.TenDBHA, ClusterType.TenDBCluster, ClusterType.TenDBSingle]:
raise Exception(f"Invalid cluster_type: expected one of [{mysql_cluster_type}] for MySQL.")
self.clusters = (
Cluster.objects.filter(cluster_type__in=mysql_cluster_types)
.prefetch_related("storageinstance_set")
.prefetch_related("proxyinstance_set__tendbclusterspiderext")
)

@staticmethod
def _get_storage_instance_for_cluster(cluster: Cluster):
Expand All @@ -58,30 +62,43 @@ def _get_storage_instance_for_cluster(cluster: Cluster):

return storage_instances + proxy_instances

def _get_job_users_for_cluster(self, cluster: Cluster) -> list:
def _get_job_users_for_cluster(self, clusters: list[Cluster]) -> list:
"""
遍历mysql集群列表,获取mysql实例中存在job随机账号
"""
instances = self._get_storage_instance_for_cluster(cluster=cluster)
# 初始化字典列表
cluster_instances_map = defaultdict(list)
instances_by_cloud_id = defaultdict(list)
resp_dict = defaultdict(list)
for cluster in clusters:
# 获取每个 cluster 的实例列表
instances = self._get_storage_instance_for_cluster(cluster=cluster)
cluster_instances_map[cluster.name] = instances
# 根据 cluster 的 bk_cloud_id 将实例添加到对应的列表中
instances_by_cloud_id[cluster.bk_cloud_id].extend(instances)
get_job_users_sql = f"select user,host from mysql.user where user like '{DBM_MYSQL_JOB_TMP_USER_PREFIX}%' "

resp = DRSApi.rpc(
{
"addresses": instances,
"cmds": ["set tc_admin = 0;", get_job_users_sql],
"force": True,
"bk_cloud_id": cluster.bk_cloud_id,
}
)

for info in resp:
if info["error_msg"]:
logger.error(f"get job_users failed in cluster [{cluster.name}] : [{info['error_msg']}]")
for cloud_id, instances in instances_by_cloud_id.items():
resp = DRSApi.rpc(
{
"addresses": instances,
"cmds": ["set tc_admin = 0;", get_job_users_sql],
"force": True,
"bk_cloud_id": cloud_id,
}
)
resp_dict[cloud_id] = resp
for info in resp:
if info["error_msg"]:
cluster_name = next(
(key for key, values in cluster_instances_map.items() if info["address"] in values), None
)
logger.error(f"get job_users failed in cluster [{cluster_name}] : [{info['error_msg']}]")

return resp
return resp_dict

@staticmethod
def _drop_expired_job_user_for_instance(cluster: Cluster, user_info: dict, address: str):
def _drop_expired_job_user_for_instance(cloud_id: int, user_info: dict, address: str):
"""
删除已过期的账号
统一用drop user 命令删除临时账号
Expand All @@ -98,48 +115,48 @@ def _drop_expired_job_user_for_instance(cluster: Cluster, user_info: dict, addre
"set session sql_log_bin = 1;",
],
"force": True,
"bk_cloud_id": cluster.bk_cloud_id,
"bk_cloud_id": cloud_id,
}
)
logger.info(f"drop user [{user_info['user']}@{user_info['host']} in instance : [{address}]")

return

def check_job_user_is_expired(self, cluster: Cluster):
def check_job_user_is_expired(self, clusters: list[Cluster]):
"""
判断账号是否过期
"""
resp = self._get_job_users_for_cluster(cluster=cluster)
for info in resp:
if info["cmd_results"] is None:
continue

for cmd_result in info["cmd_results"]:
if not cmd_result.get("table_data", None):
# 如果是空列表,则表示实例上没有job_user, 正常跳过处理。
resp_dict = self._get_job_users_for_cluster(clusters=clusters)
for cloud_id, infos in resp_dict.items():
for info in infos:
if info["cmd_results"] is None:
continue
else:
# 如果不是空,则逐个判断随机账号情况,判断已过期,则删除
for user_info in cmd_result.get("table_data"):
flow_rood_id = user_info["user"].replace(DBM_MYSQL_JOB_TMP_USER_PREFIX, "")
if Flow.objects.filter(
flow_obj_id=flow_rood_id,
status__in=[TicketFlowStatus.TERMINATED, TicketFlowStatus.REVOKED],
).exists() and user_info["host"] in ["localhost", info["address"].split(":")[0]]:
"""
如果对应的job_id存在,且状态已经是终止或者撤销状态,则认为单据已经停止,可删除临时账号
如果host不是localhost和local_ip, 则认为这不是dbm产生的临时账号
"""
self._drop_expired_job_user_for_instance(
cluster=cluster, user_info=user_info, address=info["address"]
)
else:
# 匹配不到,则认为running状态,不作处理
pass

for cmd_result in info["cmd_results"]:
if not cmd_result.get("table_data", None):
# 如果是空列表,则表示实例上没有job_user, 正常跳过处理。
continue
else:
# 如果不是空,则逐个判断随机账号情况,判断已过期,则删除
for user_info in cmd_result.get("table_data"):
flow_rood_id = user_info["user"].replace(DBM_MYSQL_JOB_TMP_USER_PREFIX, "")
if Flow.objects.filter(
flow_obj_id=flow_rood_id,
status__in=[TicketFlowStatus.TERMINATED, TicketFlowStatus.REVOKED],
).exists() and user_info["host"] in ["localhost", info["address"].split(":")[0]]:
"""
如果对应的job_id存在,且状态已经是终止或者撤销状态,则认为单据已经停止,可删除临时账号
如果host不是localhost和local_ip, 则认为这不是dbm产生的临时账号
"""
self._drop_expired_job_user_for_instance(
cloud_id=cloud_id, user_info=user_info, address=info["address"]
)
else:
# 匹配不到,则认为running状态,不作处理
pass

def do_check(self):
"""
遍历检查
检查账号是否过期
"""
for cluster in self.clusters:
self.check_job_user_is_expired(cluster=cluster)
self.check_job_user_is_expired(clusters=self.clusters)
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,10 @@ def check_expired_job_users_for_mysql():
mysql 临时账号巡检
每条凌晨6点执行
"""
# 单节点集群
CheckExpiredJobUserForMysql(mysql_cluster_type=ClusterType.TenDBSingle).do_check()

# HA集群
CheckExpiredJobUserForMysql(mysql_cluster_type=ClusterType.TenDBHA).do_check()

# TenDB Cluster集群
CheckExpiredJobUserForMysql(mysql_cluster_type=ClusterType.TenDBCluster).do_check()
# 单节点、HA、TenDB Cluster集群
CheckExpiredJobUserForMysql(
mysql_cluster_types=[ClusterType.TenDBSingle, ClusterType.TenDBHA, ClusterType.TenDBCluster]
).do_check()


@register_periodic_task(run_every=crontab(minute=00, hour=7))
Expand Down

0 comments on commit 85b4a7f

Please sign in to comment.