Skip to content

Commit

Permalink
Merge pull request #307 from tbs60/dev_yanafu
Browse files Browse the repository at this point in the history
feat: 公共集群资源监控优化
  • Loading branch information
tming authored Oct 14, 2024
2 parents a14c8f5 + f33018c commit c746e5c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
9 changes: 5 additions & 4 deletions src/backend/booster/server/pkg/api/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/codec"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/types"
commonTypes "github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/types"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/api"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/engine"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/manager"
"github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/types"

"github.com/emicklei/go-restful"
)
Expand Down Expand Up @@ -167,7 +168,7 @@ func ReleaseResource(req *restful.Request, resp *restful.Response) {
blog.Errorf("release resource: release task(%s) failed, url(%s): %v",
param.TaskID, req.Request.URL.String(), err)

if err == engine.ErrorUnterminatedTaskNoFound {
if err == engine.ErrorUnterminatedTaskNoFound || err == types.ErrorTaskAlreadyTerminated {
api.ReturnRest(&api.RestResponse{Resp: resp, Message: err.Error()})
return
}
Expand Down Expand Up @@ -210,7 +211,7 @@ func getApplyParam(req *restful.Request) (*manager.TaskCreateParam, error) {
}

param := &manager.TaskCreateParam{
ProjectID: types.GetProjectIDWithScene(protocol.ProjectID, protocol.Scene),
ProjectID: commonTypes.GetProjectIDWithScene(protocol.ProjectID, protocol.Scene),
BuildID: protocol.BuildID,
ClientVersion: protocol.ClientVersion,
ClientCPU: protocol.ClientCPU,
Expand Down Expand Up @@ -240,7 +241,7 @@ func getMessageParam(req *restful.Request) (*ParamMessage, error) {
if protocol.Type == "" {
protocol.Type = MessageTask
}
protocol.ProjectID = types.GetProjectIDWithScene(protocol.ProjectID, protocol.Scene)
protocol.ProjectID = commonTypes.GetProjectIDWithScene(protocol.ProjectID, protocol.Scene)

blog.Debugf("get message param: get message: %s", string(body))
return &protocol, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,38 @@ func (o *operator) getFederationTotalNum(url string, ist config.InstanceType) (*
return result, nil
}

func (o *operator) getPodList(clusterID string) (*coreV1.PodList, error) {
blog.Debugf("k8s-operator: begin to get federation podlist %s, %s", clusterID, o.conf.BcsNamespace)

if o.conf.BcsNamespace == "" {
return nil, fmt.Errorf("k8s-operator: get podlist failed clusterID(%s): namespace is nil", clusterID)
}
client, err := o.getClientSet(clusterID)
if err != nil {
blog.Errorf("k8s-operator: try to get podlist clusterID(%s) namespace(%s)"+
"and get client set failed: %v", clusterID, o.conf.BcsNamespace, err)
return nil, err
}
podList, err := client.clientSet.CoreV1().Pods(o.conf.BcsNamespace).List(context.TODO(), metaV1.ListOptions{})
if err != nil {
blog.Errorf("k8s-operator: get pod list from k8s failed clusterID(%s): %v", clusterID, err)
return nil, err
}
return podList, nil
}

func (o *operator) getFederationResource(clusterID string) ([]*op.NodeInfo, error) {
blog.Debugf("k8s-operator: begin to get federation resource %s, %s", clusterID, o.conf.BcsNamespace)
nodeInfoList := make([]*op.NodeInfo, 0, 1000)
if o.conf.BcsNamespace == "" {
return nil, fmt.Errorf("crm: get federation resource request failed clusterID(%s): namespace is nil", clusterID)
}
url := fmt.Sprintf(bcsAPIFederatedURI, o.conf.BcsAPIPool.GetAddress(), clusterID, o.conf.BcsNamespace)
podList, err := o.getPodList(clusterID)
if err != nil {
return nodeInfoList, fmt.Errorf("crm: get federation resource request failed clusterID(%s): %s", clusterID, err)
}

for _, ist := range o.conf.InstanceType {
result, err := o.getFederationTotalNum(url, ist)
if err != nil { //接口请求失败,直接返回错误
Expand All @@ -433,12 +458,37 @@ func (o *operator) getFederationResource(clusterID string) ([]*op.NodeInfo, erro
return nodeInfoList, err
}
totalIst := float64(result.Data.Total)
resourceUsed := make(coreV1.ResourceList)
for _, pod := range podList.Items {
if pod.Status.Phase == coreV1.PodSucceeded || pod.Status.Phase == coreV1.PodFailed {
continue
}
if pod.Spec.NodeSelector != nil {
if pod.Spec.NodeSelector[o.platformLabelKey] != ist.Platform ||
pod.Spec.NodeSelector[o.cityLabelKey] != ist.Group {
continue
}
}
for podName, podLimitValue := range podLimits(&pod) {
if value, ok := resourceUsed[podName]; !ok {
resourceUsed[podName] = podLimitValue.DeepCopy()
} else {
value.Add(podLimitValue)
resourceUsed[podName] = value
}
}
}
nodeInfoList = append(nodeInfoList, &op.NodeInfo{
IP: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
Hostname: clusterID + "-" + o.conf.BcsNamespace + "-" + ist.Platform + "-" + ist.Group,
DiskLeft: totalIst,
CPULeft: totalIst * ist.CPUPerInstance,
MemLeft: totalIst * ist.MemPerInstance,
//CPULeft: totalIst * ist.CPUPerInstance,
//MemLeft: totalIst * ist.MemPerInstance,
MemUsed: float64(resourceUsed.Memory().Value()) / 1024 / 1024,
CPUUsed: float64(resourceUsed.Cpu().Value()),
DiskUsed: float64(resourceUsed.StorageEphemeral().Value()),
CPUTotal: float64(resourceUsed.Cpu().Value()) + totalIst*ist.CPUPerInstance,
MemTotal: float64(resourceUsed.Memory().Value())/1024/1024 + totalIst*ist.MemPerInstance,
Attributes: map[string]string{
op.AttributeKeyPlatform: ist.Platform,
op.AttributeKeyCity: ist.Group,
Expand Down Expand Up @@ -984,6 +1034,19 @@ func podRequests(pod *coreV1.Pod) coreV1.ResourceList {
return requests
}

func podLimits(pod *coreV1.Pod) coreV1.ResourceList {
limits := coreV1.ResourceList{}
for _, container := range pod.Spec.Containers {
addResourceList(limits, container.Resources.Limits)
}
// init containers define the minimum of any resource
for _, container := range pod.Spec.InitContainers {
maxResourceList(limits, container.Resources.Limits)
}

return limits
}

// addResourceList adds the resources in newList to list
func addResourceList(list, new coreV1.ResourceList) {
for name, quantity := range new {
Expand Down

0 comments on commit c746e5c

Please sign in to comment.