Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): 资源池支持跨组的同园区资源申请 #8919 #8920

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbm-ui/backend/db_services/dbbase/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ def query_cluster_stat(self, request, *args, **kwargs):

cluster_domain_qs = Cluster.objects.filter(bk_biz_id=3).values("immute_domain", "id")
cluster_domain_map = {cluster["immute_domain"]: cluster["id"] for cluster in cluster_domain_qs}
cluster_stat_map = {cluster_domain_map[domain]: cap for domain, cap in cluster_stat_map.items()}
cluster_stat_map = {
cluster_domain_map[domain]: cap for domain, cap in cluster_stat_map.items() if domain in cluster_domain_map
}

return Response(cluster_stat_map)

Expand Down
3 changes: 0 additions & 3 deletions dbm-ui/backend/env/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,5 @@
WEBCONSOLE_USERNAME = get_type_env(key="WEBCONSOLE_USERNAME", _type=str, default="")
WEBCONSOLE_PASSWORD = get_type_env(key="WEBCONSOLE_PASSWORD", _type=str, default="")

# 资源池伪造开关
FAKE_RESOURCE_APPLY_ENABLE = get_type_env(key="FAKE_RESOURCE_APPLY_ENABLE", _type=bool, default=False)

# 跳过审批开关,默认关闭,方便本地联调
ITSM_FLOW_SKIP = get_type_env(key="ITSM_FLOW_SKIP", _type=bool, default=False)
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
export DRS_WEBCONSOLE_PASSWORD="{{drs_webconsole_password}}"
export DRS_PORT={{drs_port}}
export DRS_LOG_JSON=true # 是否使用 json 格式日志
export DRS_LOG_CONSOLE=true # 是否在 stdout 打印日志
export DRS_LOG_CONSOLE=false # 是否在 stdout 打印日志
export DRS_LOG_DEBUG=true # 启用 debug 日志级别
export DRS_KEY_FILE="/home/mysql/db-remote-service/server.key"


# 容器环境不要使用
export DRS_TMYSQLPARSER_BIN="tmysqlparse"
export DRS_LOG_FILE=test.log # 是否在文件打印日志, 文件路径
export DRS_LOG_FILE_ROTATE_SIZE=10 # rotate 大小, MB
export DRS_LOG_FILE_ROTATE_SIZE=20 # rotate 大小, MB
export DRS_LOG_FILE_MAX_BACKUP=5 # 旧日志保留数
export DRS_LOG_FILE_MAX_AGE=5 # 过期天数
export DRS_CA_FILE=/home/mysql/db-remote-service/server.crt
Expand Down
13 changes: 11 additions & 2 deletions dbm-ui/backend/iam_app/dataclass/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ..exceptions import BaseIAMError
from ..handlers.client import IAM
from ..handlers.permission import Permission
from .actions import _all_actions
from .actions import ActionEnum, _all_actions
from .resources import ResourceEnum, ResourceMeta, _all_resources, _extra_instance_selections

logger = logging.getLogger("root")
Expand Down Expand Up @@ -221,7 +221,16 @@ def assign_auth_to_group(iam: IAM, biz: AppCache, group_id):
"""
给单个用户组分配权限,这里的权限固定是DBA权限
"""
biz_actions = [action for action in _all_actions.values() if action.group not in [_("全局设置"), _("资源管理")]]
global_action_groups = [
ActionEnum.GLOBAL_MANAGE.group,
ActionEnum.RESOURCE_MANAGE.group,
ActionEnum.PLATFORM_MANAGE.group,
]
biz_actions = [
action
for action in _all_actions.values()
if action.group not in global_action_groups and action.related_resource_types
]
auth_contents = generate_resource_topo_auth(biz_actions, bk_biz_id=biz.bk_biz_id, bk_biz_name=biz.bk_biz_name)
for auth_info in auth_contents:
ok, message, data = iam._client.grant_user_group_actions(env.BK_IAM_SYSTEM_ID, group_id, data=auth_info)
Expand Down
13 changes: 13 additions & 0 deletions dbm-ui/backend/ticket/builders/mysql/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import datetime
import re
from typing import Any, Dict, List, Union

Expand All @@ -18,6 +19,7 @@
from backend.configuration.constants import DBType
from backend.db_meta.enums import AccessLayer, ClusterDBHAStatusFlags, ClusterType, InstanceInnerRole
from backend.db_meta.models.cluster import Cluster, ClusterPhase
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.consts import SYSTEM_DBS
from backend.flow.utils.mysql.db_table_filter.exception import DbTableFilterValidateException
from backend.flow.utils.mysql.db_table_filter.tools import glob_check
Expand Down Expand Up @@ -166,6 +168,17 @@ def validate_slave_is_stand_by(self, attrs):
slave_insts = [f"{info['slave_ip']['ip']}" for info in attrs["infos"]]
CommonValidate.validate_slave_is_stand_by(slave_insts)

def validated_cluster_latest_backup(self, cluster_ids, backup_source, backup_type=None):
"""校验集群是否具有最近一次备份日志"""
now = datetime.datetime.now(datetime.timezone.utc)
for cluster_id in cluster_ids:
handler = FixPointRollbackHandler(cluster_id=cluster_id)
backup = handler.query_latest_backup_log(rollback_time=now, backup_source=backup_source)
if not backup:
raise serializers.ValidationError(_("集群{}无法找到最近一次备份").format(cluster_id))
if backup_type and backup["backup_type"] != backup_type:
raise serializers.ValidationError(_("集群{}最近一次备份类型不匹配{}").format(cluster_id, backup_type))

def validate(self, attrs):
# 默认全局校验只需要校验集群的状态
self.validate_cluster_can_access(attrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from backend.db_services.dbbase.constants import IpSource
from backend.flow.engine.controller.mysql import MySQLController
from backend.ticket import builders
from backend.ticket.builders.common.base import BaseOperateResourceParamBuilder, HostInfoSerializer
from backend.ticket.builders.common.base import BaseOperateResourceParamBuilder, HostInfoSerializer, fetch_cluster_ids
from backend.ticket.builders.common.constants import MySQLBackupSource
from backend.ticket.builders.mysql.base import MySQLBaseOperateDetailSerializer
from backend.ticket.builders.mysql.mysql_master_slave_switch import (
Expand All @@ -43,10 +43,15 @@ class MigrateClusterInfoSerializer(serializers.Serializer):
is_safe = serializers.BooleanField(help_text=_("安全模式"), default=True)

def validate(self, attrs):
cluster_ids = fetch_cluster_ids(attrs)

# 校验集群是否可用,集群类型为高可用
super().validate_cluster_can_access(attrs)
super().validated_cluster_type(attrs, ClusterType.TenDBHA)

# 校验集群存在最近一次全备
super().validated_cluster_latest_backup(cluster_ids, attrs["backup_source"])

if attrs["ip_source"] == IpSource.RESOURCE_POOL:
return attrs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import datetime
import itertools

from django.utils.translation import gettext_lazy as _
Expand All @@ -18,7 +17,6 @@
from backend.db_meta.enums import ClusterType, InstanceRole
from backend.db_meta.models import Cluster
from backend.db_services.dbbase.constants import IpSource
from backend.db_services.mysql.fixpoint_rollback.handlers import FixPointRollbackHandler
from backend.flow.consts import MySQLBackupTypeEnum
from backend.flow.engine.controller.mysql import MySQLController
from backend.ticket import builders
Expand Down Expand Up @@ -66,13 +64,8 @@ def validate(self, attrs):
super(MysqlMigrateUpgradeDetailSerializer, self).validated_cluster_type(attrs, ClusterType.TenDBHA)

# 校验集群最近一次备份记录是逻辑备份
now = datetime.datetime.now(datetime.timezone.utc)
cluster_ids = fetch_cluster_ids(attrs)
for cluster in cluster_ids:
handler = FixPointRollbackHandler(cluster_id=cluster)
backup = handler.query_latest_backup_log(rollback_time=now, backup_source=attrs["backup_source"])
if not backup or backup["backup_type"] != MySQLBackupTypeEnum.LOGICAL:
raise serializers.ValidationError(_("集群{}无法找到最近一次备份,或最近一次备份不为逻辑备份").format(cluster))
super().validated_cluster_latest_backup(cluster_ids, attrs["backup_source"], MySQLBackupTypeEnum.LOGICAL)

if attrs["ip_source"] == IpSource.RESOURCE_POOL:
return attrs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.flow.engine.controller.mysql import MySQLController
from backend.ticket import builders
from backend.ticket.builders.common.base import InstanceInfoSerializer
from backend.ticket.builders.common.base import InstanceInfoSerializer, fetch_cluster_ids
from backend.ticket.builders.common.constants import MySQLBackupSource
from backend.ticket.builders.mysql.base import BaseMySQLHATicketFlowBuilder, MySQLBaseOperateDetailSerializer
from backend.ticket.constants import TicketType
Expand All @@ -31,6 +31,8 @@ class SlaveInfoSerializer(serializers.Serializer):
force = serializers.BooleanField(help_text=_("是否强制执行"), required=False, default=False)

def validate(self, attrs):
cluster_ids = fetch_cluster_ids(attrs)

# 校验集群是否可用,集群类型为高可用
super(MysqlRestoreLocalSlaveDetailSerializer, self).validate_cluster_can_access(attrs)
super(MysqlRestoreLocalSlaveDetailSerializer, self).validated_cluster_type(attrs, ClusterType.TenDBHA)
Expand All @@ -45,6 +47,11 @@ def validate(self, attrs):
attrs, instance_key=["slave"], cluster_key=["cluster_id"], role=InstanceInnerRole.SLAVE
)

# 校验集群存在最近一次全备
super(MysqlRestoreLocalSlaveDetailSerializer, self).validated_cluster_latest_backup(
cluster_ids, attrs["backup_source"]
)

return attrs


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from backend.db_meta.enums import ClusterType, InstanceInnerRole
from backend.flow.engine.controller.mysql import MySQLController
from backend.ticket import builders
from backend.ticket.builders.common.base import HostInfoSerializer, InstanceInfoSerializer
from backend.ticket.builders.common.base import HostInfoSerializer, InstanceInfoSerializer, fetch_cluster_ids
from backend.ticket.builders.common.constants import MySQLBackupSource
from backend.ticket.builders.mysql.base import BaseMySQLHATicketFlowBuilder, MySQLBaseOperateDetailSerializer
from backend.ticket.constants import TicketType
Expand All @@ -31,6 +31,8 @@ class RestoreInfoSerializer(serializers.Serializer):
infos = serializers.ListField(help_text=_("集群重建信息"), child=RestoreInfoSerializer())

def validate(self, attrs):
cluster_ids = fetch_cluster_ids(attrs)

# 校验集群是否可用,集群类型为高可用
super(MysqlRestoreSlaveDetailSerializer, self).validate_cluster_can_access(attrs)
super(MysqlRestoreSlaveDetailSerializer, self).validated_cluster_type(attrs, ClusterType.TenDBHA)
Expand All @@ -50,6 +52,11 @@ def validate(self, attrs):
attrs, host_key=["new_slave"], cluster_key=["cluster_ids"]
)

# 校验集群存在最近一次全备
super(MysqlRestoreSlaveDetailSerializer, self).validated_cluster_latest_backup(
cluster_ids, attrs["backup_source"]
)

return attrs


Expand Down
Binary file modified dbm-ui/backend/ticket/exclusive_ticket.xlsx
Binary file not shown.
16 changes: 0 additions & 16 deletions dbm-ui/backend/ticket/flow_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from django.db import transaction

from backend import env
from backend.core import notify
from backend.ticket import constants
from backend.ticket.constants import FLOW_FINISHED_STATUS, FlowType, TicketStatus
Expand All @@ -21,10 +20,6 @@
from backend.ticket.flow_manager.itsm import ItsmFlow
from backend.ticket.flow_manager.pause import PauseFlow
from backend.ticket.flow_manager.resource import (
FakeResourceApplyFlow,
FakeResourceBatchApplyFlow,
FakeResourceBatchDeliveryFlow,
FakeResourceDeliveryFlow,
ResourceApplyFlow,
ResourceBatchApplyFlow,
ResourceBatchDeliveryFlow,
Expand All @@ -48,17 +43,6 @@
FlowType.RESOURCE_BATCH_DELIVERY: ResourceBatchDeliveryFlow,
}

# 开启无资源池环境调试,从空闲机筛选机器伪造资源返回
if env.FAKE_RESOURCE_APPLY_ENABLE:
SUPPORTED_FLOW_MAP.update(
{
FlowType.RESOURCE_APPLY: FakeResourceApplyFlow,
FlowType.RESOURCE_DELIVERY: FakeResourceDeliveryFlow,
FlowType.RESOURCE_BATCH_APPLY: FakeResourceBatchApplyFlow,
FlowType.RESOURCE_BATCH_DELIVERY: FakeResourceBatchDeliveryFlow,
}
)

logger = logging.getLogger("root")


Expand Down
92 changes: 7 additions & 85 deletions dbm-ui/backend/ticket/flow_manager/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
from collections import defaultdict
from typing import Any, Dict, List, Optional, Union

from django.core.cache import cache
from django.utils.translation import gettext as _

from backend import env
from backend.components.dbresource.client import DBResourceApi
from backend.configuration.constants import AffinityEnum
from backend.configuration.models import DBAdministrator
from backend.core import notify
from backend.db_meta.models import Spec
from backend.db_services.dbresource.exceptions import ResourceApplyException, ResourceApplyInsufficientException
from backend.db_services.ipchooser.constants import CommonEnum
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.ticket import constants
from backend.ticket.constants import FlowCallbackType, FlowType, ResourceApplyErrCode, TodoType
from backend.ticket.flow_manager.base import BaseTicketFlow
Expand Down Expand Up @@ -150,7 +146,7 @@ def _format_resource_hosts(self, hosts):

def apply_resource(self, ticket_data):
"""资源申请"""
apply_params: Dict[str, Union[str, List]] = {
apply_params: Dict[str, Union[str, Any]] = {
"for_biz_id": ticket_data["bk_biz_id"],
"resource_type": self.ticket.group,
"bill_id": str(self.ticket.id),
Expand All @@ -165,6 +161,12 @@ def apply_resource(self, ticket_data):
if not apply_params["details"]:
return "", {}

# groups_in_same_location只在同城同园区亲和性下才成效,保证所有组申请的机器都在同园区
# 目前所有组亲和性相同,任取一个判断即可
affinity = apply_params["details"][0]["affinity"]
if affinity in [AffinityEnum.SAME_SUBZONE, AffinityEnum.SAME_SUBZONE_CROSS_SWTICH]:
apply_params.update(groups_in_same_location=True)

# 向资源池申请机器
resp = DBResourceApi.resource_pre_apply(params=apply_params, raw=True)
if resp["code"] == ResourceApplyErrCode.RESOURCE_LAKE:
Expand Down Expand Up @@ -405,83 +407,3 @@ class ResourceBatchDeliveryFlow(ResourceDeliveryFlow):
def _run(self) -> str:
# 暂时与单独交付节点没有区别
return super()._run()


class FakeResourceApplyFlow(ResourceApplyFlow):
def apply_resource(self, ticket_data):
"""模拟资源池申请"""

host_in_use = set(cache.get(HOST_IN_USE, []))

resp = ResourceQueryHelper.query_cc_hosts(
{"bk_biz_id": env.DBA_APP_BK_BIZ_ID, "bk_inst_id": 7, "bk_obj_id": "module"},
[],
0,
1000,
CommonEnum.DEFAULT_HOST_FIELDS.value,
return_status=True,
bk_cloud_id=0,
)
count, apply_data = resp["count"], list(filter(lambda x: x["status"] == 1, resp["info"]))

for item in apply_data:
item["ip"] = item["bk_host_innerip"]

# 排除缓存占用的主机
host_free = list(filter(lambda x: x["bk_host_id"] not in host_in_use, apply_data))

index = 0
expected_count = 0
node_infos: Dict[str, List] = defaultdict(list)
for detail in self.fetch_apply_params(ticket_data):
role, count = detail["group_mark"], detail["count"]
host_infos = host_free[index : index + count]
try:
if "backend_group" in role:
backend_group_name = role.rsplit("_", 1)[0]
node_infos[backend_group_name].append({"master": host_infos[0], "slave": host_infos[1]})
else:
node_infos[role] = host_infos
except IndexError:
raise ResourceApplyException(_("模拟资源申请失败,主机数量不够"))

index += count
expected_count += len(host_infos)

if expected_count < index:
raise ResourceApplyException(_("模拟资源申请失败,主机数量不够:{} < {}").format(count, index))

logger.info(_("模拟资源申请成功(%s):%s"), expected_count, node_infos)

# 添加新占用的主机
host_in_use = host_in_use.union(list(map(lambda x: x["bk_host_id"], host_free[:index])))
cache.set(HOST_IN_USE, list(host_in_use))

return count, node_infos


class FakeResourceBatchApplyFlow(FakeResourceApplyFlow, ResourceBatchApplyFlow):
pass


class FakeResourceDeliveryFlow(ResourceDeliveryFlow):
"""
内置资源申请交付流程,暂时无需操作
"""

def confirm_resource(self, ticket_data):
pass

def _run(self) -> str:
self.confirm_resource(self.ticket.details)
return super()._run()


class FakeResourceBatchDeliveryFlow(FakeResourceDeliveryFlow):
"""
内置资源申请批量交付流程,主要是通知资源池机器使用成功
"""

def _run(self) -> str:
# 暂时与单独交付节点没有区别
return super()._run()
6 changes: 3 additions & 3 deletions helm-charts/bk-dbm/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
version: 7.9.8
- name: dbm
repository: file://charts/dbm
version: 0.1.44
version: 0.1.45
- name: dbconfig
repository: file://charts/dbconfig
version: 0.1.13
Expand Down Expand Up @@ -65,5 +65,5 @@ dependencies:
- name: db-dbha
repository: file://charts/db-dbha
version: 0.1.0
digest: sha256:293e4133fc64f236159a34fdbc0ab61a732ef1fa5f7b76fa91c3de18d1c59d48
generated: "2024-09-25T19:33:01.490734+08:00"
digest: sha256:4569713d2f5d7645e6e9e6a33424e0a6ea226da1623acddcea7ee55c5e5af43e
generated: "2025-01-09T10:17:34.916203+08:00"
Loading
Loading