Skip to content

Commit

Permalink
feat(dbm-services): add global monitor for DBHA close #9055
Browse files Browse the repository at this point in the history
  • Loading branch information
xjxia committed Jan 14, 2025
1 parent 7bbded4 commit 71fab74
Show file tree
Hide file tree
Showing 19 changed files with 573 additions and 321 deletions.
4 changes: 2 additions & 2 deletions dbm-services/common/dbha/ha-module/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SHELL := /bin/bash
BASE_DIR = $(shell pwd)
VERSION = 0.0.1
GITHASH = ""
VERSION = $(shell git describe --tags --always --dirty)
GITHASH = $(shell git rev-parse --short HEAD)
APPNAME = dbha
GOOS ?= linux
BUILD_FLAG = " -X main.version=${VERSION} -X main.githash=${GITHASH} "
Expand Down
38 changes: 22 additions & 16 deletions dbm-services/common/dbha/ha-module/agent/monitor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewMonitorAgent(conf *config.Config, detectType string) (*MonitorAgent, err
// report agent's heartbeat info.
func (a *MonitorAgent) Process(instances map[string]dbutil.DataBaseDetect) {
var wg sync.WaitGroup
startTime := time.Now().Unix()
startTime := time.Now()
sem := make(chan struct{}, a.MaxConcurrency) // 创建一个有缓冲的通道,容量为 maxConcurrency
log.Logger.Debugf("[%s] need to detect instances number:%d", a.DetectType, len(a.DBInstance))
for _, ins := range instances {
Expand All @@ -113,9 +113,10 @@ func (a *MonitorAgent) Process(instances map[string]dbutil.DataBaseDetect) {
}(ins)
}
wg.Wait()
interval := int(time.Now().Sub(startTime).Seconds())
log.Logger.Debugf("[%s] detected instances number:%d ,cost: %d",
a.DetectType, len(a.DBInstance), time.Now().Unix()-startTime)
a.DetectPostProcess()
a.DetectType, len(a.DBInstance), interval)
a.DetectPostProcess(interval)
time.Sleep(time.Second)
}

Expand Down Expand Up @@ -185,8 +186,8 @@ func (a *MonitorAgent) DoDetectSingle(ins dbutil.DataBaseDetect) {
}

// DetectPostProcess post agent heartbeat
func (a *MonitorAgent) DetectPostProcess() {
err := a.reporterHeartbeat()
func (a *MonitorAgent) DetectPostProcess(interval int) {
err := a.reporterHeartbeat(interval)
if err != nil {
log.Logger.Errorf("reporter heartbeat failed. err:%s", err.Error())
}
Expand Down Expand Up @@ -232,15 +233,19 @@ func (a *MonitorAgent) FetchDBInstance() error {
a.HashMod = mod
a.HashValue = modValue

req := client.DBInstanceInfoRequest{
req := client.DBInstanceByCityRequest{
LogicalCityIDs: []int{a.CityID},
HashCnt: mod,
HashValue: modValue,
ClusterTypes: []string{a.DetectType},
}

rawInfo, err := a.CmDBClient.GetDBInstanceInfoByClusterType(req)
rawInfo, err := a.CmDBClient.GetDBInstanceInfoByCityID(req)
if err != nil {
minInfo := monitor.GetApiAlertInfo(constvar.CmDBInstanceUrl, err.Error())
if e := monitor.MonitorSend("get instances failed", minInfo); e != nil {
log.Logger.Warnf(e.Error())
}
log.Logger.Errorf("get instance info from cmdb failed. err:%s", err.Error())
return err
}
Expand Down Expand Up @@ -306,20 +311,20 @@ func (a *MonitorAgent) FetchGMInstance() error {
continue
}
// needn't lock
_, ok := a.GMInstance[info.Ip]
_, ok := a.GMInstance[info.IP]
if ok {
a.GMInstance[info.Ip].LastFetchTime = time.Now()
a.GMInstance[info.IP].LastFetchTime = time.Now()
} else {
a.GMInstance[info.Ip] = &GMConnection{
Ip: info.Ip,
a.GMInstance[info.IP] = &GMConnection{
Ip: info.IP,
Port: info.Port,
LastFetchTime: time.Now(),
IsClose: false,
}
err = a.GMInstance[info.Ip].Init()
err = a.GMInstance[info.IP].Init()
if err != nil {
log.Logger.Errorf("init gm failed. gm_ip:%s, gm_port:%d, err:%s",
info.Ip, info.Port, err.Error())
info.Port, info.Port, err.Error())
return err
}
}
Expand All @@ -342,6 +347,7 @@ func (a *MonitorAgent) NeedReportGM(ins dbutil.DataBaseDetect) bool {
cachedIns := a.ReportGMCache[ip]
now := time.Now()
if now.Before(cachedIns.ReporterGMTime.Add(time.Second * time.Duration(cachedIns.ExpireInterval))) {
log.Logger.Debugf("instance[%s] cached, skip report to gm", cachedIns.Ip)
return false
}
}
Expand Down Expand Up @@ -395,6 +401,7 @@ func (a *MonitorAgent) ReportDetectInfoToGM(reporterInstance dbutil.DataBaseDete
//do retry
continue
} else {
log.Logger.Debugf("reporter instance[%s#%d] to gm[%s#%d] success", ip, port, gmIns.Ip, gmIns.Port)
isReported = true
gmIns.Mutex.Unlock()
a.ReportGMCache[ip] = &CachedHostInfo{
Expand Down Expand Up @@ -484,9 +491,8 @@ func (a *MonitorAgent) registerAgentInfoToHaDB() error {
}

// reporterHeartbeat send agent heartbeat to HA-DB
func (a *MonitorAgent) reporterHeartbeat() error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, int(interval), a.HashMod, a.HashValue)
func (a *MonitorAgent) reporterHeartbeat(interval int) error {
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, interval, a.HashMod, a.HashValue)
a.heartbeat = time.Now()
return err
}
Expand Down
5 changes: 3 additions & 2 deletions dbm-services/common/dbha/ha-module/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (c *Client) DoNewForCB(
}

var retryErr error
var response interface{}
for retryIdx := 0; retryIdx < 5; retryIdx++ {
response, retryErr := c.doNewInner(method, url, params, headers, bodyCB)
response, retryErr = c.doNewInner(method, url, params, headers, bodyCB)
if retryErr == nil {
return response, nil
}
Expand Down Expand Up @@ -211,7 +212,7 @@ func (c *Client) doNewInner(method, url string, params interface{},

result, err := bodyCB(b)
if err != nil {
log.Logger.Errorf(err.Error())
log.Logger.Errorf(fmt.Sprintf("%s:%s", util.AtWhere(), err.Error()))
return nil, err
}
return result, nil
Expand Down
95 changes: 42 additions & 53 deletions dbm-services/common/dbha/ha-module/client/cmdb.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
package client

import (
"encoding/json"
"fmt"
"net/http"

"dbm-services/common/dbha/ha-module/config"
"dbm-services/common/dbha/ha-module/constvar"
"dbm-services/common/dbha/ha-module/log"
"dbm-services/common/dbha/ha-module/util"

"encoding/json"
"fmt"
"net/http"
)

// CmDBClient client to request cmdb
type CmDBClient struct {
Client
}

// DBInstanceInfoByAddressRequest fetch instances list from cmdb by ip
type DBInstanceInfoByAddressRequest struct {
// DBInstanceByAddressRequest fetch instances list from cmdb by ip
type DBInstanceByAddressRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
Addresses []string `json:"addresses"`
}

// DBInstanceInfoRequest fetch instances list from cmdb by city and status
type DBInstanceInfoRequest struct {
// DBInstanceByClusterTypeRequest fetch instances list from cmdb by ip
type DBInstanceByClusterTypeRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
Statuses []string `json:"statuses"`
HashCnt int `json:"hash_cnt"`
HashValue int `json:"hash_value"`
ClusterTypes []string `json:"cluster_types"`
}

// DBInstanceByCityRequest fetch instances list from cmdb by city and status
type DBInstanceByCityRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
LogicalCityIDs []int `json:"logical_city_ids"`
Expand Down Expand Up @@ -135,7 +145,7 @@ func NewCmDBClient(conf *config.APIConfig, cloudId int) *CmDBClient {
// GetDBInstanceInfoByIp fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByIp(ip string) ([]interface{}, error) {
var res []interface{}
req := DBInstanceInfoByAddressRequest{
req := DBInstanceByAddressRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Addresses: []string{ip},
Expand All @@ -156,41 +166,20 @@ func (c *CmDBClient) GetDBInstanceInfoByIp(ip string) ([]interface{}, error) {
return res, nil
}

// GetAllDBInstanceInfo detect running, available status instance
func (c *CmDBClient) GetAllDBInstanceInfo() ([]interface{}, error) {
req := DBInstanceInfoRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
}

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
return nil, err
}
if response.Code != 0 {
return nil, fmt.Errorf("%s failed, return code:%d, msg:%s", util.AtWhere(), response.Code, response.Msg)
}

var res []interface{}
err = json.Unmarshal(response.Data, &res)
if err != nil {
return nil, err
}

return res, nil
}

// GetDBInstanceInfoByCity detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByCity(cityID int) ([]interface{}, error) {
req := DBInstanceInfoRequest{
// GetDBInstanceInfoByCityID detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByCityID(requestInfo DBInstanceByCityRequest) ([]interface{}, error) {
req := DBInstanceByCityRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
LogicalCityIDs: []int{cityID},
LogicalCityIDs: requestInfo.LogicalCityIDs,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
}

log.Logger.Debugf("GetDBInstanceInfoByCityID param:%#v", req)

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
Expand All @@ -209,19 +198,18 @@ func (c *CmDBClient) GetDBInstanceInfoByCity(cityID int) ([]interface{}, error)
return res, nil
}

// GetDBInstanceInfoByClusterType detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByClusterType(requestInfo DBInstanceInfoRequest) ([]interface{}, error) {
req := DBInstanceInfoRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
LogicalCityIDs: requestInfo.LogicalCityIDs,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
// GetDBInstanceByClusterType detect running, available status instance
func (c *CmDBClient) GetDBInstanceByClusterType(requestInfo DBInstanceByClusterTypeRequest) ([]interface{}, error) {
req := DBInstanceByClusterTypeRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
}

log.Logger.Debugf("GetDBInstanceInfo param:%#v", req)
log.Logger.Debugf("GetDBInstanceByClusterType param:%#v", req)

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
Expand All @@ -241,15 +229,16 @@ func (c *CmDBClient) GetDBInstanceInfoByClusterType(requestInfo DBInstanceInfoRe
return res, nil
}

// GetDBInstanceInfoByCluster fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByCluster(clusterName string) ([]interface{}, error) {
// GetDBInstanceInfoByAddress fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByAddress(clusterName string) ([]interface{}, error) {
var res []interface{}
req := DBInstanceInfoByAddressRequest{
req := DBInstanceByAddressRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Addresses: []string{clusterName},
}

log.Logger.Debugf("GetDBInstanceInfoByAddress param:%#v", req)
response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
Expand Down
Loading

0 comments on commit 71fab74

Please sign in to comment.