Skip to content

Commit

Permalink
fix: 修复mysql/spider单据一些bug #7297
Browse files Browse the repository at this point in the history
  • Loading branch information
yksitu authored and iSecloud committed Oct 18, 2024
1 parent 52cc8d0 commit 07c0700
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ def master_fail_over_flow(self):
# 阶段3 并发change master 的 原子任务,如果集群多余的slave节点,剩余所有的slave节点同步new master 的数据

if cluster["other_slave_info"]:
cluster_switch_sub_pipeline.add_act(
act_name=_("其余slave下发db-actuator介质"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=info["slave_ip"]["bk_cloud_id"],
exec_ip=cluster["other_slave_info"],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
)

acts_list = []
for exec_ip in cluster["other_slave_info"]:
cluster_sw_kwargs.exec_ip = exec_ip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def master_slave_switch_flow(self):
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=info["slave_ip"]["bk_cloud_id"],
exec_ip=info["slave_ip"]["ip"],
exec_ip=[info["slave_ip"]["ip"], info["master_ip"]["ip"]],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
Expand Down Expand Up @@ -309,6 +309,20 @@ def master_slave_switch_flow(self):
)

# 阶段4 并发change master 的 原子任务,集群所有的slave节点同步new master 的数据
# 补充其余slave信息下发dbactor,有可能有产生文件冲突
if cluster["other_slave_info"]:
cluster_switch_sub_pipeline.add_act(
act_name=_("其余slave下发db-actuator介质"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=info["slave_ip"]["bk_cloud_id"],
exec_ip=cluster["other_slave_info"],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
)

acts_list = []

for exec_ip in [info["master_ip"]["ip"]] + cluster["other_slave_info"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import TenDBClusterSpiderRole
from backend.db_meta.models import Cluster
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.spider.common.common_sub_flow import reduce_spider_slaves_flow
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.check_client_connections import CheckClientConnComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.spider.drop_spider_ronting import DropSpiderRoutingComponent
from backend.flow.utils.mysql.mysql_act_dataclass import RecycleDnsRecordKwargs
from backend.flow.utils.mysql.mysql_act_dataclass import CheckClientConnKwargs, RecycleDnsRecordKwargs
from backend.flow.utils.spider.spider_act_dataclass import DropSpiderRoutingKwargs

logger = logging.getLogger("flow")
Expand Down Expand Up @@ -63,6 +65,22 @@ def reduce_spider_mnt(self):

sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))

# 预检测
if self.data["is_safe"]:
sub_pipeline.add_act(
act_name=_("检测回收Spider端连接情况"),
act_component_code=CheckClientConnComponent.code,
kwargs=asdict(
CheckClientConnKwargs(
bk_cloud_id=cluster.bk_cloud_id,
check_instances=[
f"{i['ip']}{IP_PORT_DIVIDER}{cluster.proxyinstance_set.first().port}"
for i in reduce_spiders
],
)
),
)

# 删除spider的路由关系
sub_pipeline.add_act(
act_name=_("删除spider的路由关系"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import TenDBClusterSpiderRole
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.models import Cluster
Expand All @@ -23,9 +24,10 @@
from backend.flow.engine.bamboo.scene.spider.common.common_sub_flow import reduce_spider_slaves_flow
from backend.flow.engine.bamboo.scene.spider.common.exceptions import NormalSpiderFlowException
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.check_client_connections import CheckClientConnComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.spider.drop_spider_ronting import DropSpiderRoutingComponent
from backend.flow.utils.mysql.mysql_act_dataclass import RecycleDnsRecordKwargs
from backend.flow.utils.mysql.mysql_act_dataclass import CheckClientConnKwargs, RecycleDnsRecordKwargs
from backend.flow.utils.spider.spider_act_dataclass import DropSpiderRoutingKwargs

logger = logging.getLogger("flow")
Expand Down Expand Up @@ -139,6 +141,22 @@ def reduce_spider_nodes(self):
# 启动子流程
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(sub_flow_context))

# 预检测
if self.data["is_safe"]:
sub_pipeline.add_act(
act_name=_("检测回收Spider端连接情况"),
act_component_code=CheckClientConnComponent.code,
kwargs=asdict(
CheckClientConnKwargs(
bk_cloud_id=cluster.bk_cloud_id,
check_instances=[
f"{i['ip']}{IP_PORT_DIVIDER}{cluster.proxyinstance_set.first().port}"
for i in reduce_spiders
],
)
),
)

# 删除spider的路由关系
sub_pipeline.add_act(
act_name=_("删除spider的路由关系"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,9 @@
from backend.db_meta.models import Cluster, ProxyInstance
from backend.flow.engine.bamboo.scene.spider.common.exceptions import DropSpiderNodeFailedException
from backend.flow.plugins.components.collections.common.base_service import BaseService
from backend.flow.utils.mysql.check_client_connections import check_client_connection


class DropSpiderRoutingService(BaseService):
def _pre_check(self, cluster: Cluster, reduce_spider: ProxyInstance):
"""
检测待下架的spider节点是否有存在访问
"""

res = check_client_connection(bk_cloud_id=cluster.bk_cloud_id, instances=[reduce_spider.ip_port])

if res[0]["error_msg"]:
raise DropSpiderNodeFailedException(message=_("select processlist failed: {}".format(res[0]["error_msg"])))

if res[0]["cmd_results"][0]["table_data"]:
self.log_error(f"There are also {len(res[0]['cmd_results'][0]['table_data'])} non-sleep state threads")
process_list = res[0]["cmd_results"][0]["table_data"]
for p in process_list:
# 打印连接
self.log_error(
f"proc_id: {p['ID']}, command:{p['COMMAND']}, host:{p['HOST']}, info:{p['INFO']}, time:{p['TIME']}"
)
return False

return True

def _exec_drop_routing(self, cluster: Cluster, spider: ProxyInstance):
"""
执行删除节点路由逻辑
Expand Down Expand Up @@ -91,18 +68,12 @@ def _execute(self, data, parent_data):
kwargs = data.get_one_of_inputs("kwargs")

reduce_spiders = kwargs["reduce_spiders"]
is_safe = kwargs["is_safe"]
cluster = Cluster.objects.get(id=kwargs["cluster_id"])

for spider in reduce_spiders:
# spider机器是专属于一套集群,单机单实例
s = cluster.proxyinstance_set.get(machine__ip=spider["ip"])

if is_safe:
if not self._pre_check(cluster, s):
# 检测不通过退出
return False

# 执行删除路由
self.log_info(f"exec drop node [{s.ip_port}]")
self._exec_drop_routing(cluster, s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

from pipeline.component_framework.component import Component

from backend.components import DBPrivManagerApi
from backend.components import DBPrivManagerApi, DRSApi
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.models import Cluster
from backend.flow.consts import TDBCTL_USER
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptService
Expand All @@ -28,6 +29,27 @@ class RemoteMigrateCutOverService(ExecuteDBActuatorScriptService):
2: 执行成对切换的db_actor命令
"""

def _drop_tdbctl_user(self, instances: list, ctl_primary: str, bk_cloud_id: int):
"""
尝试在新节点删除内置账号,这里不做返回报错处理
@param instances: 需要操作的实例
@param ctl_primary: 集群中控信息
@param bk_cloud_id: 新节点的云区域
"""
# 删除已经存在的spider账号
rpc_params = {
"addresses": instances,
"cmds": [
f"drop user '{TDBCTL_USER}'@'{ctl_primary.split(IP_PORT_DIVIDER)[0]}'",
],
"force": False,
"bk_cloud_id": bk_cloud_id,
}
# drs服务远程请求
res = DRSApi.rpc(rpc_params)
self.log_info(res)
return

def _execute(self, data, parent_data, callback=None) -> bool:
global_data = data.get_one_of_inputs("global_data")

Expand All @@ -48,6 +70,13 @@ def _execute(self, data, parent_data, callback=None) -> bool:
"hosts": [ctl_primary.split(":")[0]],
}

# 尝试删除内置账号
self._drop_tdbctl_user(
instances=[i[key] for i in global_data["migrate_tuples"] for key in ["new_master", "new_slave"]],
ctl_primary=ctl_primary,
bk_cloud_id=cluster.bk_cloud_id,
)

for info in global_data["migrate_tuples"]:
params["address"] = info["new_master"]
DBPrivManagerApi.add_priv_without_account_rule(params=params)
Expand Down

0 comments on commit 07c0700

Please sign in to comment.