Skip to content

Commit

Permalink
fix(backend): redis备份巡检任务优化 #8627
Browse files Browse the repository at this point in the history
  • Loading branch information
ygcyao authored and iSecloud committed Dec 23, 2024
1 parent 631bf42 commit fd6d371
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
from datetime import datetime, timedelta
from typing import Any, List

from django.db.models import Q
from django.db.models import Prefetch, Q
from django.utils import timezone
from django.utils.translation import ugettext as _

from backend.components import DBConfigApi
from backend.components.dbconfig.constants import FormatType, LevelName
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceRole
from backend.db_meta.models import Cluster
from backend.db_meta.models import Cluster, StorageInstance, StorageInstanceTuple
from backend.db_report.enums import RedisBackupCheckSubType
from backend.db_report.models import RedisBackupCheckReport
from backend.db_services.redis.util import is_tendisplus_instance_type, is_tendisssd_instance_type
Expand All @@ -44,13 +44,29 @@ def _check_tendis_binlog_backup():
且binlog序号要连续
"""
failed_records = []
# 构建查询条件:tendisplus,ssd,cache,集群创建时间大于1天,巡检0点发起
query = (
Q(cluster_type=ClusterType.TendisPredixyTendisplusCluster)
| Q(cluster_type=ClusterType.TwemproxyTendisSSDInstance)
) & Q(create_at__lt=timezone.now() - timedelta(days=1))
# 遍历集群
for c in Cluster.objects.filter(query):

# 预取storanges关联的as_ejector对象
ejector_prefetch = Prefetch(
"as_ejector", queryset=StorageInstanceTuple.objects.select_related("receiver"), to_attr="ejector_tuples"
)

# 预取storages对象
storage_prefetch = Prefetch(
"storageinstance_set",
queryset=StorageInstance.objects.filter(instance_role=InstanceRole.REDIS_MASTER.value)
.select_related("machine")
.prefetch_related(ejector_prefetch),
to_attr="storages",
)

cluster_list = Cluster.objects.filter(query).prefetch_related(storage_prefetch)
for c in cluster_list:
logger.info("+===+++++=== start check {} binlog backup +++++===++++ ".format(c.immute_domain))
logger.info("+===+++++=== cluster type is: {} +++++===++++ ".format(c.cluster_type))
cluster_slave_instance = [] # 初始化集群slave列表,binlog只在slave上生成
Expand All @@ -63,8 +79,8 @@ def _check_tendis_binlog_backup():
kvstorecount = redis_config["kvstorecount"]
logger.info("+===+++++=== kvstorecount is: {} +++++===++++ ".format(kvstorecount))

for master_obj in c.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value):
slave_obj = master_obj.as_ejector.get().receiver
for master_obj in c.storages:
slave_obj = master_obj.ejector_tuples[0].receiver
current_datetime = timezone.now()
# 计算时间差
time_difference = current_datetime - slave_obj.create_at
Expand Down Expand Up @@ -95,15 +111,15 @@ def _check_tendis_binlog_backup():
if not bklogs:
msg = _("无法查找到在时间范围内{}-{},集群{}:{}的binlog备份日志").format(start_time, end_time, c.immute_domain, instance)
logger.error(msg)
binlog_backup_failed_record(c, instance, msg)
failed_records.append(create_binlog_failed_record(c, instance, msg))
continue
logger.info(_("+===+++++=== {} 集群{} 实例维度日志不为空 +++++===++++ ".format(c.immute_domain, instance)))
for bklog in bklogs:
# 失败的记录,直接记录:同一条记录,不可能又存在失败的,又存在成功的,所以有失败的直接入库了
if bklog.get("backup_status", "") == "to_backup_system_failed":
logger.error("+===+++++=== to_backup_system_failed bklog: {} +++++===++++ ".format(bklog))
msg = bklog["backup_status_info"]
binlog_backup_failed_record(c, instance, msg)
failed_records.append(create_binlog_failed_record(c, instance, msg))

# 对成功的进行处理,判断文件序号的完备性
if bklog.get("backup_status", "") == "to_backup_system_success":
Expand All @@ -126,7 +142,7 @@ def _check_tendis_binlog_backup():
len(bin_index_list), bin_index_list
)
logger.error("+===+++++=== {}+++++===++++ ".format(msg))
binlog_backup_failed_record(c, instance, msg)
failed_records.append(create_binlog_failed_record(c, instance, msg))
# else:
# 这里打日志的话,有助于排查问题,但是日志有点多
# logger.info("+===+++++=== {} binlog 序号连续 +++++===++++ ".format(kvstore_filter))
Expand All @@ -140,17 +156,20 @@ def _check_tendis_binlog_backup():
len(bin_index_list), bin_index_list
)
logger.error("+===+++++=== {}+++++===++++ ".format(msg))
binlog_backup_failed_record(c, instance, msg)
failed_records.append(create_binlog_failed_record(c, instance, msg))
else:
logger.info(_("+===+++++=== {} binlog 序号连续 +++++===++++ ".format(instance)))

# 批量插入binlog备份失败记录
RedisBackupCheckReport.objects.bulk_create(failed_records)


def binlog_backup_failed_record(c: Cluster, instance: str, msg: str):
def create_binlog_failed_record(c: Cluster, instance: str, msg: str):
"""
记录binlog备份失败的集群和实例
创建binlog备份失败的集群和实例
"""
logger.info(_("+===++=== 实例{}binlog备份失败,集群类型{}写入表 ++++++++ ".format(instance, c.cluster_type)))
RedisBackupCheckReport.objects.create(
logger.info(_("+===++=== 实例{}binlog备份失败,集群类型{}写入记录 ++++++++ ".format(instance, c.cluster_type)))
return RedisBackupCheckReport(
creator=c.creator,
bk_biz_id=c.bk_biz_id,
bk_cloud_id=c.bk_cloud_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
from collections import defaultdict
from datetime import datetime, timedelta

from django.db.models import Q
from django.db.models import Prefetch, Q
from django.utils import timezone
from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, InstanceRole
from backend.db_meta.models import Cluster
from backend.db_meta.models import Cluster, StorageInstance, StorageInstanceTuple
from backend.db_report.enums import RedisBackupCheckSubType
from backend.db_report.models import RedisBackupCheckReport

Expand Down Expand Up @@ -60,6 +60,7 @@ def _check_tendis_full_backup():
"""
删除时间大于60天的记录,全备份和binlog都是同一张表,这里操作就好
"""
failed_records = []
RedisBackupCheckReport.objects.filter(create_at__lte=timezone.now() - timedelta(days=60)).delete()

# 构建查询条件:tendisplus,ssd,cache,集群创建时间大于1天,巡检0点发起
Expand All @@ -68,8 +69,22 @@ def _check_tendis_full_backup():
| Q(cluster_type=ClusterType.TwemproxyTendisSSDInstance)
| Q(cluster_type=ClusterType.TendisTwemproxyRedisInstance)
) & Q(create_at__lt=timezone.now() - timedelta(days=1))
# 遍历集群
for c in Cluster.objects.filter(query):

# 预取storanges关联的as_ejector对象
ejector_prefetch = Prefetch(
"as_ejector", queryset=StorageInstanceTuple.objects.select_related("receiver"), to_attr="ejector_tuples"
)

# 预取storages对象
storage_prefetch = Prefetch(
"storageinstance_set",
queryset=StorageInstance.objects.filter(instance_role=InstanceRole.REDIS_MASTER.value)
.select_related("machine")
.prefetch_related(ejector_prefetch),
to_attr="storages",
)
cluster_list = Cluster.objects.filter(query).prefetch_related(storage_prefetch)
for c in cluster_list:
logger.info("+===+++++=== start check {} full backup +++++===++++ ".format(c.immute_domain))
logger.info("+===+++++=== cluster type is: {} +++++===++++ ".format(c.cluster_type))
cluster_slave_instance = [] # 初始化集群slave列表
Expand All @@ -78,8 +93,8 @@ def _check_tendis_full_backup():
bklog_success_instance_count = {} # 初始化字典:节点和对应的备份次数
slave_ins_map = defaultdict() # 主从节点对应关系

for master_obj in c.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value):
slave_obj = master_obj.as_ejector.get().receiver
for master_obj in c.storages:
slave_obj = master_obj.ejector_tuples[0].receiver
current_datetime = timezone.now()
# 计算时间差
time_difference = current_datetime - slave_obj.create_at
Expand Down Expand Up @@ -118,7 +133,7 @@ def _check_tendis_full_backup():
msg = _("无法查找到在时间范围内{}-{},集群{}的全备份日志").format(start_time, end_time, c.immute_domain)
logger.error(msg)
instance = "all instance"
full_backup_failed_record(c, instance, msg)
failed_records.append(create_full_failed_record(c, instance, msg))
continue

logger.info(_("+===+++++=== {} 集群维度日志不为空 +++++===++++ ".format(c.immute_domain)))
Expand All @@ -128,7 +143,7 @@ def _check_tendis_full_backup():
logger.error("+===+++++=== to_backup_system_failed bklog: {} +++++===++++ ".format(bklog))
msg = bklog["backup_status_info"]
instance = bklog["redis_ip"] + IP_PORT_DIVIDER + str(bklog["redis_port"])
full_backup_failed_record(c, instance, msg)
failed_records.append(create_full_failed_record(c, instance, msg))

# 对成功的进行处理
if bklog.get("backup_status", "") == "to_backup_system_success":
Expand Down Expand Up @@ -162,15 +177,18 @@ def _check_tendis_full_backup():
instance, count, master_instance, master_backup_count, expect_count
)
# 记录备份失败的集群和实例
full_backup_failed_record(c, instance, msg)
failed_records.append(create_full_failed_record(c, instance, msg))

# 批量插入备份失败记录
RedisBackupCheckReport.objects.bulk_create(failed_records)


def full_backup_failed_record(c: Cluster, instance: str, msg: str):
def create_full_failed_record(c, instance, msg):
"""
记录全备备份失败的集群和实例
创建全备备份失败记录对象
"""
logger.info(_("+===++=== 实例{}全备份失败,集群类型{}写入表 ++++++++ ".format(instance, c.cluster_type)))
RedisBackupCheckReport.objects.create(
logger.info(_("+===++=== 实例{}全备份失败,集群类型{}写入记录 ++++++++ ".format(instance, c.cluster_type)))
return RedisBackupCheckReport(
creator=c.creator,
bk_biz_id=c.bk_biz_id,
bk_cloud_id=c.bk_cloud_id,
Expand Down

0 comments on commit fd6d371

Please sign in to comment.