Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dbm-services): add global monitor for DBHA #9056

Open
wants to merge 1 commit into
base: v1.5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbm-services/common/dbha/ha-module/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SHELL := /bin/bash
BASE_DIR = $(shell pwd)
VERSION = 0.0.1
GITHASH = ""
VERSION = $(shell git describe --tags --always --dirty)
GITHASH = $(shell git rev-parse --short HEAD)
APPNAME = dbha
GOOS ?= linux
BUILD_FLAG = " -X main.version=${VERSION} -X main.githash=${GITHASH} "
Expand Down
38 changes: 22 additions & 16 deletions dbm-services/common/dbha/ha-module/agent/monitor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewMonitorAgent(conf *config.Config, detectType string) (*MonitorAgent, err
// report agent's heartbeat info.
func (a *MonitorAgent) Process(instances map[string]dbutil.DataBaseDetect) {
var wg sync.WaitGroup
startTime := time.Now().Unix()
startTime := time.Now()
sem := make(chan struct{}, a.MaxConcurrency) // 创建一个有缓冲的通道,容量为 maxConcurrency
log.Logger.Debugf("[%s] need to detect instances number:%d", a.DetectType, len(a.DBInstance))
for _, ins := range instances {
Expand All @@ -113,9 +113,10 @@ func (a *MonitorAgent) Process(instances map[string]dbutil.DataBaseDetect) {
}(ins)
}
wg.Wait()
interval := int(time.Now().Sub(startTime).Seconds())
log.Logger.Debugf("[%s] detected instances number:%d ,cost: %d",
a.DetectType, len(a.DBInstance), time.Now().Unix()-startTime)
a.DetectPostProcess()
a.DetectType, len(a.DBInstance), interval)
a.DetectPostProcess(interval)
time.Sleep(time.Second)
}

Expand Down Expand Up @@ -185,8 +186,8 @@ func (a *MonitorAgent) DoDetectSingle(ins dbutil.DataBaseDetect) {
}

// DetectPostProcess post agent heartbeat
func (a *MonitorAgent) DetectPostProcess() {
err := a.reporterHeartbeat()
func (a *MonitorAgent) DetectPostProcess(interval int) {
err := a.reporterHeartbeat(interval)
if err != nil {
log.Logger.Errorf("reporter heartbeat failed. err:%s", err.Error())
}
Expand Down Expand Up @@ -232,15 +233,19 @@ func (a *MonitorAgent) FetchDBInstance() error {
a.HashMod = mod
a.HashValue = modValue

req := client.DBInstanceInfoRequest{
req := client.DBInstanceByCityRequest{
LogicalCityIDs: []int{a.CityID},
HashCnt: mod,
HashValue: modValue,
ClusterTypes: []string{a.DetectType},
}

rawInfo, err := a.CmDBClient.GetDBInstanceInfoByClusterType(req)
rawInfo, err := a.CmDBClient.GetDBInstanceInfoByCityID(req)
if err != nil {
minInfo := monitor.GetApiAlertInfo(constvar.CmDBInstanceUrl, err.Error())
if e := monitor.MonitorSend("get instances failed", minInfo); e != nil {
log.Logger.Warnf(e.Error())
}
log.Logger.Errorf("get instance info from cmdb failed. err:%s", err.Error())
return err
}
Expand Down Expand Up @@ -306,20 +311,20 @@ func (a *MonitorAgent) FetchGMInstance() error {
continue
}
// needn't lock
_, ok := a.GMInstance[info.Ip]
_, ok := a.GMInstance[info.IP]
if ok {
a.GMInstance[info.Ip].LastFetchTime = time.Now()
a.GMInstance[info.IP].LastFetchTime = time.Now()
} else {
a.GMInstance[info.Ip] = &GMConnection{
Ip: info.Ip,
a.GMInstance[info.IP] = &GMConnection{
Ip: info.IP,
Port: info.Port,
LastFetchTime: time.Now(),
IsClose: false,
}
err = a.GMInstance[info.Ip].Init()
err = a.GMInstance[info.IP].Init()
if err != nil {
log.Logger.Errorf("init gm failed. gm_ip:%s, gm_port:%d, err:%s",
info.Ip, info.Port, err.Error())
info.Port, info.Port, err.Error())
return err
}
}
Expand All @@ -342,6 +347,7 @@ func (a *MonitorAgent) NeedReportGM(ins dbutil.DataBaseDetect) bool {
cachedIns := a.ReportGMCache[ip]
now := time.Now()
if now.Before(cachedIns.ReporterGMTime.Add(time.Second * time.Duration(cachedIns.ExpireInterval))) {
log.Logger.Debugf("instance[%s] cached, skip report to gm", cachedIns.Ip)
return false
}
}
Expand Down Expand Up @@ -395,6 +401,7 @@ func (a *MonitorAgent) ReportDetectInfoToGM(reporterInstance dbutil.DataBaseDete
//do retry
continue
} else {
log.Logger.Debugf("reporter instance[%s#%d] to gm[%s#%d] success", ip, port, gmIns.Ip, gmIns.Port)
isReported = true
gmIns.Mutex.Unlock()
a.ReportGMCache[ip] = &CachedHostInfo{
Expand Down Expand Up @@ -484,9 +491,8 @@ func (a *MonitorAgent) registerAgentInfoToHaDB() error {
}

// reporterHeartbeat send agent heartbeat to HA-DB
func (a *MonitorAgent) reporterHeartbeat() error {
interval := time.Now().Sub(a.heartbeat).Seconds()
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, int(interval), a.HashMod, a.HashValue)
func (a *MonitorAgent) reporterHeartbeat(interval int) error {
err := a.HaDBClient.ReporterAgentHeartbeat(a.MonIp, a.DetectType, interval, a.HashMod, a.HashValue)
a.heartbeat = time.Now()
return err
}
Expand Down
5 changes: 3 additions & 2 deletions dbm-services/common/dbha/ha-module/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (c *Client) DoNewForCB(
}

var retryErr error
var response interface{}
for retryIdx := 0; retryIdx < 5; retryIdx++ {
response, retryErr := c.doNewInner(method, url, params, headers, bodyCB)
response, retryErr = c.doNewInner(method, url, params, headers, bodyCB)
if retryErr == nil {
return response, nil
}
Expand Down Expand Up @@ -211,7 +212,7 @@ func (c *Client) doNewInner(method, url string, params interface{},

result, err := bodyCB(b)
if err != nil {
log.Logger.Errorf(err.Error())
log.Logger.Errorf(fmt.Sprintf("%s:%s", util.AtWhere(), err.Error()))
return nil, err
}
return result, nil
Expand Down
95 changes: 42 additions & 53 deletions dbm-services/common/dbha/ha-module/client/cmdb.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
package client

import (
"encoding/json"
"fmt"
"net/http"

"dbm-services/common/dbha/ha-module/config"
"dbm-services/common/dbha/ha-module/constvar"
"dbm-services/common/dbha/ha-module/log"
"dbm-services/common/dbha/ha-module/util"

"encoding/json"
"fmt"
"net/http"
)

// CmDBClient client to request cmdb
type CmDBClient struct {
Client
}

// DBInstanceInfoByAddressRequest fetch instances list from cmdb by ip
type DBInstanceInfoByAddressRequest struct {
// DBInstanceByAddressRequest fetch instances list from cmdb by ip
type DBInstanceByAddressRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
Addresses []string `json:"addresses"`
}

// DBInstanceInfoRequest fetch instances list from cmdb by city and status
type DBInstanceInfoRequest struct {
// DBInstanceByClusterTypeRequest fetch instances list from cmdb by ip
type DBInstanceByClusterTypeRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
Statuses []string `json:"statuses"`
HashCnt int `json:"hash_cnt"`
HashValue int `json:"hash_value"`
ClusterTypes []string `json:"cluster_types"`
}

// DBInstanceByCityRequest fetch instances list from cmdb by city and status
type DBInstanceByCityRequest struct {
DBCloudToken string `json:"db_cloud_token"`
BKCloudID int `json:"bk_cloud_id"`
LogicalCityIDs []int `json:"logical_city_ids"`
Expand Down Expand Up @@ -135,7 +145,7 @@ func NewCmDBClient(conf *config.APIConfig, cloudId int) *CmDBClient {
// GetDBInstanceInfoByIp fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByIp(ip string) ([]interface{}, error) {
var res []interface{}
req := DBInstanceInfoByAddressRequest{
req := DBInstanceByAddressRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Addresses: []string{ip},
Expand All @@ -156,41 +166,20 @@ func (c *CmDBClient) GetDBInstanceInfoByIp(ip string) ([]interface{}, error) {
return res, nil
}

// GetAllDBInstanceInfo detect running, available status instance
func (c *CmDBClient) GetAllDBInstanceInfo() ([]interface{}, error) {
req := DBInstanceInfoRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
}

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
return nil, err
}
if response.Code != 0 {
return nil, fmt.Errorf("%s failed, return code:%d, msg:%s", util.AtWhere(), response.Code, response.Msg)
}

var res []interface{}
err = json.Unmarshal(response.Data, &res)
if err != nil {
return nil, err
}

return res, nil
}

// GetDBInstanceInfoByCity detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByCity(cityID int) ([]interface{}, error) {
req := DBInstanceInfoRequest{
// GetDBInstanceInfoByCityID detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByCityID(requestInfo DBInstanceByCityRequest) ([]interface{}, error) {
req := DBInstanceByCityRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
LogicalCityIDs: []int{cityID},
LogicalCityIDs: requestInfo.LogicalCityIDs,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
}

log.Logger.Debugf("GetDBInstanceInfoByCityID param:%#v", req)

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
Expand All @@ -209,19 +198,18 @@ func (c *CmDBClient) GetDBInstanceInfoByCity(cityID int) ([]interface{}, error)
return res, nil
}

// GetDBInstanceInfoByClusterType detect running, available status instance
func (c *CmDBClient) GetDBInstanceInfoByClusterType(requestInfo DBInstanceInfoRequest) ([]interface{}, error) {
req := DBInstanceInfoRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
LogicalCityIDs: requestInfo.LogicalCityIDs,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
// GetDBInstanceByClusterType detect running, available status instance
func (c *CmDBClient) GetDBInstanceByClusterType(requestInfo DBInstanceByClusterTypeRequest) ([]interface{}, error) {
req := DBInstanceByClusterTypeRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Statuses: []string{constvar.RUNNING, constvar.AVAILABLE},
HashCnt: requestInfo.HashCnt,
HashValue: requestInfo.HashValue,
ClusterTypes: requestInfo.ClusterTypes,
}

log.Logger.Debugf("GetDBInstanceInfo param:%#v", req)
log.Logger.Debugf("GetDBInstanceByClusterType param:%#v", req)

response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
Expand All @@ -241,15 +229,16 @@ func (c *CmDBClient) GetDBInstanceInfoByClusterType(requestInfo DBInstanceInfoRe
return res, nil
}

// GetDBInstanceInfoByCluster fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByCluster(clusterName string) ([]interface{}, error) {
// GetDBInstanceInfoByAddress fetch instance info from cmdb by ip
func (c *CmDBClient) GetDBInstanceInfoByAddress(clusterName string) ([]interface{}, error) {
var res []interface{}
req := DBInstanceInfoByAddressRequest{
req := DBInstanceByAddressRequest{
DBCloudToken: c.Conf.BKConf.BkToken,
BKCloudID: c.CloudId,
Addresses: []string{clusterName},
}

log.Logger.Debugf("GetDBInstanceInfoByAddress param:%#v", req)
response, err := c.DoNew(
http.MethodPost, c.SpliceUrlByPrefix(c.Conf.UrlPre, constvar.CmDBInstanceUrl, ""), req, nil)
if err != nil {
Expand Down
Loading