Skip to content

Commit

Permalink
feat:support result cache, issue: TencentBlueKing#315
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Jan 7, 2025
1 parent 43d900e commit 867baba
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ const (
func newExecutor(mgr *Mgr,
req *types.LocalTaskExecuteRequest,
globalWork *types.Work,
supportAbsPath bool) (*executor, error) {
supportAbsPath bool,
groupkey string,
remoteTriggleSecs int) (*executor, error) {
environ := env.NewSandbox(req.Environments)
bt := dcType.GetBoosterType(environ.GetEnv(env.BoosterType))
hdl, err := handlermap.GetHandler(bt)
Expand Down Expand Up @@ -91,7 +93,7 @@ func newExecutor(mgr *Mgr,

// TODO : 通过修改e.sandbox来影响e.handler,因为e.sandbox是个指针
// 这个方法目前是可用的,因为handler直接保存了该指针
e.initResultCacheInfo()
e.initResultCacheInfo(groupkey, remoteTriggleSecs)
if e.hitLocalIndex || e.hitRemoteIndex {
e.sandbox.Env.AppendEnv(env.KeyExecutorHasResultIndex, "true")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ const (
DefaultTriggleSeconds = 300
)

func (e *executor) initResultCacheInfo() {
func (e *executor) initResultCacheInfo(groupkey string, remoteTriggleSecs int) {
e.cacheType = e.handler.SupportResultCache(e.req.Commands)

e.cacheGroupKey = ""
if str := e.sandbox.Env.GetEnv(env.ProjectID); str != "" {
e.cacheGroupKey = str
e.cacheGroupKey = groupkey
if e.cacheGroupKey == "" {
if str := e.sandbox.Env.GetEnv(env.ProjectID); str != "" {
e.cacheGroupKey = str
}
}

e.remoteTriggleSecs = DefaultTriggleSeconds
if str := e.sandbox.Env.GetEnv(env.KeyExecutorResultCacheTriggleSecs); str != "" {
intv, err := strconv.Atoi(str)
if err == nil && intv >= 0 {
e.remoteTriggleSecs = intv
if remoteTriggleSecs > 0 {
e.remoteTriggleSecs = remoteTriggleSecs
} else {
e.remoteTriggleSecs = DefaultTriggleSeconds
if str := e.sandbox.Env.GetEnv(env.KeyExecutorResultCacheTriggleSecs); str != "" {
intv, err := strconv.Atoi(str)
if err == nil && intv >= 0 {
e.remoteTriggleSecs = intv
}
}
}

Expand Down
64 changes: 48 additions & 16 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type data4resultcache struct {
localGroupRecord *resultcache.RecordGroup
remoteGroupRecord map[string]*resultcache.RecordGroup

remoteTriggleSecs int

hashlock sync.RWMutex
hasher hash
}
Expand Down Expand Up @@ -89,6 +91,22 @@ func (d *data4resultcache) getRecordGroup(key string) *resultcache.RecordGroup {
return nil
}

func (d *data4resultcache) initCacheList(nodes []string) {
tmplist := make(map[string]*protocol.Host)
for _, v := range nodes {
tmplist[v] = &protocol.Host{
Server: v,
TokenString: v,
Hosttype: protocol.HostRemote,
Jobs: 1,
Compresstype: protocol.CompressLZ4,
Protocol: "tcp",
}
}

d.resultCacheList = tmplist
}

func (d *data4resultcache) initHash(nodes []string) {
if len(nodes) == 0 {
return
Expand Down Expand Up @@ -175,23 +193,11 @@ func (m *Mgr) Start() {

rcl := m.work.Basic().GetCacheServer()
if len(rcl) > 0 {
tmplist := make(map[string]*protocol.Host)
for _, v := range rcl {
tmplist[v] = &protocol.Host{
Server: v,
TokenString: v,
Hosttype: protocol.HostRemote,
Jobs: 1,
Compresstype: protocol.CompressLZ4,
Protocol: "tcp",
}
}

m.resultdata.resultCacheList = tmplist
m.resultdata.initCacheList(rcl)
m.resultdata.initHash(rcl)
}

go m.initResultCacheIndex()
go m.initResultCacheIndex(settings.ProjectID)
}

m.resource.Handle(ctx)
Expand Down Expand Up @@ -244,7 +250,12 @@ func (m *Mgr) ExecuteTask(
blog.Infof("local: try to execute task(%s) for work(%s) from pid(%d) in env(%v) dir(%s)",
strings.Join(req.Commands, " "), m.work.ID(), req.Pid, req.Environments, req.Dir)

e, err := newExecutor(m, req, globalWork, m.work.Resource().SupportAbsPath())
e, err := newExecutor(m,
req,
globalWork,
m.work.Resource().SupportAbsPath(),
m.resultdata.groupKey,
m.resultdata.remoteTriggleSecs)
if err != nil {
blog.Errorf("local: try to execute task for work(%s) from pid(%d) get executor failed: %v",
m.work.ID(), req.Pid, err)
Expand Down Expand Up @@ -594,7 +605,7 @@ func (m *Mgr) getTryTimes(e *executor) int {
}

// --------------------------for result cache-----------------------------
func (m *Mgr) initResultCacheIndex() {
func (m *Mgr) initResultCacheIndex(projectid string) {
// get local firstly
m.getLocalResultCacheIndex()

Expand All @@ -603,6 +614,23 @@ func (m *Mgr) initResultCacheIndex() {
// 如果没有指定cache 列表,则从tbs server拉取
if len(m.resultdata.resultCacheList) == 0 {
blog.Infof("local: start get cache list now")
resouceMgr := m.work.Resource()
if resouceMgr != nil {
cachelist, err := resouceMgr.GetCacheList(projectid)
if err == nil && cachelist != nil && len(*cachelist) > 0 {
blog.Infof("local: got cache list:%v", *cachelist)
for _, v := range *cachelist {
if len(v.Hosts) > 0 {
m.resultdata.initCacheList(v.Hosts)
m.resultdata.initHash(v.Hosts)
}
if v.RemoteExecuteTimeThreshold > 0 {
m.resultdata.remoteTriggleSecs = v.RemoteExecuteTimeThreshold
}
break
}
}
}
}

if len(m.resultdata.resultCacheList) > 0 {
Expand Down Expand Up @@ -752,3 +780,7 @@ func (m *Mgr) reportRemoteResultCache(

return nil, nil
}

func (m *Mgr) getCacheList() {
// m.work.Resource().
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
inspectDistributeTaskURI = "v2/build/task?task_id=%s"
heartbeatURI = "v2/build/heartbeat"
messageURI = "v2/build/message"
applyCacheListURI = "v2/build/applycachelist?project_id=%s"
cacheListURI = "v2/build/cachelist?project_id=%s"
)

// Mgr describe the resource manager
Expand Down Expand Up @@ -1032,3 +1032,24 @@ func (m *Mgr) request(method, server, uri string, data []byte) ([]byte, bool, er

return by, true, nil
}

// GetCacheList will get cache list from tbs-server
func (m *Mgr) GetCacheList(projectid string) (*v2.CacheConfigList, error) {
blog.Infof("resource: try to get cache list for projectid(%s)", projectid)

data, _, err := m.request("GET", m.serverHost,
fmt.Sprintf(cacheListURI, projectid), nil)
if err != nil {
blog.Errorf("resource: get cache list with projectid(%s) with error: %v", projectid, err)
return nil, err
}

var info v2.CacheConfigList
if err = codec.DecJSON(data, &info); err != nil {
blog.Errorf("resource: get cache list with projectid(%s) with error: %v", projectid, err)
return nil, err
}

blog.Infof("resource: got cache list:%v for projectid(%s)", info, projectid)
return &info, nil
}
2 changes: 2 additions & 0 deletions src/backend/booster/bk_dist/controller/pkg/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ type ResourceMgr interface {

// TODO : check whether support abs path
SupportAbsPath() bool

GetCacheList(projectid string) (*v2.CacheConfigList, error)
}

// BasicMgr describe a manager for handling all actions with work basic issues
Expand Down
2 changes: 2 additions & 0 deletions src/backend/booster/server/pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
ServerErrGetServersFailed
ServerErrSendMessageFailed
ServerErrUnknownMessageType
ServerErrReadJSONFailed
)

var serverErrCode = map[ServerErrCode]string{
Expand All @@ -40,6 +41,7 @@ var serverErrCode = map[ServerErrCode]string{
ServerErrGetServersFailed: "get servers failed",
ServerErrSendMessageFailed: "send message failed",
ServerErrUnknownMessageType: "unknown message type",
ServerErrReadJSONFailed: "read json failed",
}

// String get error string from error code
Expand Down
3 changes: 3 additions & 0 deletions src/backend/booster/server/pkg/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func InitStorage() (err error) {
api.RegisterV2Action(api.Action{
Verb: "GET", Path: "/upgrade/worker", Params: nil, Handler: api.MasterRequired(QueryWorkerUpgradeInfo),
})
api.RegisterV2Action(api.Action{
Verb: "GET", Path: "/build/cachelist", Params: nil, Handler: api.MasterRequired(QueryCacheList),
})

return nil
}
Expand Down
60 changes: 60 additions & 0 deletions src/backend/booster/server/pkg/api/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
package v2

import (
"encoding/json"
"io"
"io/ioutil"
"net"
"os"
"strings"

"github.com/TencentBlueKing/bk-turbo/src/backend/booster/common/blog"
Expand Down Expand Up @@ -332,3 +335,60 @@ func QueryWorkerUpgradeInfo(req *restful.Request, resp *restful.Response) {
}
api.ReturnRest(&api.RestResponse{Resp: resp, Data: data})
}

// QueryCacheList handle the http request for querying cache list
func QueryCacheList(req *restful.Request, resp *restful.Response) {
projectID := req.QueryParameter(queryProjectIDKey)
if projectID == "" {
blog.Errorf("query cache list: url(%s): projectID not specified", req.Request.URL.String())
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrInvalidParam,
Message: "project_id no specific"})
return
}

// 打开JSON文件
file, err := os.Open(cacheListFile)
if err != nil {
blog.Errorf("query cache list: url(%s) open %s with error:%v",
req.Request.URL.String(), cacheListFile, err)
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrReadJSONFailed,
Message: "not found cache list"})
return
}
defer file.Close()

// 读取文件内容
bytes, err := io.ReadAll(file)
if err != nil {
blog.Errorf("query cache list: url(%s) read %s with error:%v",
req.Request.URL.String(), cacheListFile, err)
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrReadJSONFailed,
Message: "not found cache list"})
return
}

// 解析JSON到结构体
var config CacheConfigList
err = json.Unmarshal(bytes, &config)
if err != nil {
blog.Errorf("query cache list: url(%s) Unmarshal %s with error:%v",
req.Request.URL.String(), cacheListFile, err)
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrReadJSONFailed,
Message: "not found cache list"})
return
}

// 过滤
configfilter := make(map[string]CacheConfig)
v, ok := config[projectID]
if ok {
configfilter[projectID] = v
} else {
v, ok = config[cacheDefaultProjectID]
if ok {
configfilter[cacheDefaultProjectID] = v
}
}

api.ReturnRest(&api.RestResponse{Resp: resp, Data: configfilter})
}
14 changes: 13 additions & 1 deletion src/backend/booster/server/pkg/api/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ package v2
import "github.com/TencentBlueKing/bk-turbo/src/backend/booster/server/pkg/engine"

const (
queryTaskIDKey = "task_id"
queryTaskIDKey = "task_id"
queryProjectIDKey = "project_id"

cacheListFile = "cache_list.json"
cacheDefaultProjectID = "default"
)

// ParamApply describe the protocol of applying a piece of resources for distribute workers
Expand Down Expand Up @@ -102,3 +106,11 @@ const (
MessageTask MessageType = "task"
MessageProject MessageType = "project"
)

// CacheConfigList return cache config list
type CacheConfigList map[string]CacheConfig

type CacheConfig struct {
Hosts []string `json:"hosts"`
RemoteExecuteTimeThreshold int `json:"remote_execute_time_threshold"`
}

0 comments on commit 867baba

Please sign in to comment.