Skip to content

Commit

Permalink
feat:support result cache, issue: TencentBlueKing#315
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Dec 31, 2024
1 parent 74a098d commit c150409
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 52 deletions.
45 changes: 13 additions & 32 deletions src/backend/booster/bk_dist/common/resultcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,42 +339,20 @@ func (r Record) GetResultHashString() string {
return ""
}

func (r Record) CommandKeyEqual(another Record) bool {
func (r Record) EqualByKey(another Record, key string) bool {
if another == nil {
return false
}

myvalue := ""
if v, ok := r[CommandKey]; ok {
if v, ok := r[key]; ok {
myvalue = v
} else {
return false
}

anothervalue := ""
if v, ok := another[CommandKey]; ok {
anothervalue = v
} else {
return false
}

return myvalue == anothervalue
}

func (r Record) ResultKeyEqual(another Record) bool {
if another == nil {
return false
}

myvalue := ""
if v, ok := r[ResultKey]; ok {
myvalue = v
} else {
return false
}

anothervalue := ""
if v, ok := another[ResultKey]; ok {
if v, ok := another[key]; ok {
anothervalue = v
} else {
return false
Expand Down Expand Up @@ -407,7 +385,7 @@ func (rg *RecordGroup) PutRecord(record Record) {
// 要不要考虑去重?
existed := false
for i, v := range rg.Group {
if v.CommandKeyEqual(record) {
if v.EqualByKey(record, CommandKey) {
existed = true
rg.Group[i] = &record
}
Expand All @@ -427,7 +405,7 @@ func (rg *RecordGroup) DeleteRecord(record Record) {
deleted := false
// 先只考虑删除匹配上的第一个,后面看看要不要全部删除
for index, v := range rg.Group {
if record.CommandKeyEqual(*v) {
if record.EqualByKey(*v, CommandKey) {
rg.Group = append(rg.Group[:index], rg.Group[index+1:]...)
deleted = true
break
Expand All @@ -453,25 +431,28 @@ func (rg *RecordGroup) ToBytes() ([]byte, error) {
return jsonData, nil
}

func (rg *RecordGroup) HasIndex(record Record) (bool, error) {
func (rg *RecordGroup) HitIndex(record Record) (bool, error) {
rg.lock.RLock()
defer rg.lock.RUnlock()

for _, v := range rg.Group {
if record.CommandKeyEqual(*v) {
return true, nil
if record.EqualByKey(*v, CommandKey) {
if record.EqualByKey(*v, RemoteExecuteTimeKey) {
return true, nil
}
break
}
}

return false, nil
}

func (rg *RecordGroup) HasResult(record Record) (bool, error) {
func (rg *RecordGroup) HitResult(record Record) (bool, error) {
rg.lock.RLock()
defer rg.lock.RUnlock()

for _, v := range rg.Group {
if record.ResultKeyEqual(*v) {
if record.EqualByKey(*v, ResultKey) {
return true, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newExecutor(mgr *Mgr,
// TODO : 通过修改e.sandbox来影响e.handler,因为e.sandbox是个指针
// 这个方法目前是可用的,因为handler直接保存了该指针
e.initResultCacheInfo()
if e.hasLocalIndex || e.hasRemoteIndex {
if e.hitLocalIndex || e.hitRemoteIndex {
e.sandbox.Env.AppendEnv(env.KeyExecutorHasResultIndex, "true")
}

Expand Down Expand Up @@ -121,10 +121,10 @@ type executor struct {
commandKey string
preprocessResultKey string
remoteExecuteSecs int
hasLocalIndex bool
hasLocalResult bool
hasRemoteIndex bool
hasRemoteResult bool
hitLocalIndex bool
// hasLocalResult bool
hitRemoteIndex bool
// hasRemoteResult bool
}

// Stdout return the execution stdout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ func (e *executor) initResultCacheInfo() {
e.commandKey = strings.Join(e.req.Commands, " ")

record := resultcache.Record{
resultcache.CommandKey: e.commandKey,
resultcache.CommandKey: e.commandKey,
resultcache.RemoteExecuteTimeKey: strconv.Itoa(e.remoteTriggleSecs),
}
if e.localCacheEnabled() {
e.hasLocalIndex, _ = e.mgr.hasLocalIndex(record)
e.hitLocalIndex, _ = e.mgr.hitLocalIndex(record)
}
if e.remoteCacheEnabled() {
e.hasRemoteIndex, _ = e.mgr.hasRemoteIndex(record)
e.hitRemoteIndex, _ = e.mgr.hitRemoteIndex(record)
}

blog.Infof("executor: got cache type:%d,cache group key:%s,command:[%s],"+
"haslocalindex:%v,hasremoteindex:%v"+
"hitLocalIndex:%v,hitRemoteIndex:%v,"+
"remoteTriggleSecs:%d",
e.cacheType, e.cacheGroupKey, e.commandKey,
e.hasLocalIndex, e.hasRemoteIndex,
e.hitLocalIndex, e.hitRemoteIndex,
e.remoteTriggleSecs)
}

Expand Down Expand Up @@ -316,7 +317,7 @@ func (e *executor) putCacheResult(r *dcSDK.BKDistResult, stat *dcSDK.ControllerJ
record := resultcache.Record{}

// local cache
if e.localCacheEnabled() && (e.hasLocalIndex || remoteTooLong) {
if e.localCacheEnabled() && (e.hitLocalIndex || remoteTooLong) {
// report result files
if e.preprocessResultKey != "" {
err := e.putLocalResultFiles(r)
Expand All @@ -339,7 +340,7 @@ func (e *executor) putCacheResult(r *dcSDK.BKDistResult, stat *dcSDK.ControllerJ
}

// remote cache
if e.remoteCacheEnabled() && (e.hasRemoteIndex || remoteTooLong) {
if e.remoteCacheEnabled() && (e.hitRemoteIndex || remoteTooLong) {
if len(record) == 0 {
record[resultcache.GroupKey] = e.cacheGroupKey
record[resultcache.CommandKey] = e.commandKey
Expand Down
16 changes: 8 additions & 8 deletions src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,48 +558,48 @@ func (m *Mgr) getRemoteResultCacheIndex() {
}
}

func (m *Mgr) hasLocalIndex(record resultcache.Record) (bool, error) {
func (m *Mgr) hitLocalIndex(record resultcache.Record) (bool, error) {
m.resultdata.lock.RLock()
defer m.resultdata.lock.RUnlock()

if m.resultdata.localGroupRecord == nil {
return false, nil
}

return m.resultdata.localGroupRecord.HasIndex(record)
return m.resultdata.localGroupRecord.HitIndex(record)
}

func (m *Mgr) hasRemoteIndex(record resultcache.Record) (bool, error) {
func (m *Mgr) hitRemoteIndex(record resultcache.Record) (bool, error) {
m.resultdata.lock.RLock()
defer m.resultdata.lock.RUnlock()

if m.resultdata.remoteGroupRecord == nil {
return false, nil
}

return m.resultdata.remoteGroupRecord.HasIndex(record)
return m.resultdata.remoteGroupRecord.HitIndex(record)
}

func (m *Mgr) hasLocalResult(record resultcache.Record) (bool, error) {
func (m *Mgr) hitLocalResult(record resultcache.Record) (bool, error) {
m.resultdata.lock.RLock()
defer m.resultdata.lock.RUnlock()

if m.resultdata.localGroupRecord == nil {
return false, nil
}

return m.resultdata.localGroupRecord.HasResult(record)
return m.resultdata.localGroupRecord.HitResult(record)
}

func (m *Mgr) hasRemoteResult(record resultcache.Record) (bool, error) {
func (m *Mgr) hitRemoteResult(record resultcache.Record) (bool, error) {
m.resultdata.lock.RLock()
defer m.resultdata.lock.RUnlock()

if m.resultdata.remoteGroupRecord == nil {
return false, nil
}

return m.resultdata.remoteGroupRecord.HasResult(record)
return m.resultdata.remoteGroupRecord.HitResult(record)
}

func (m *Mgr) getRemoteResultCacheFile(resultkey string) (*dcSDK.BKQueryResultCacheFileResult, error) {
Expand Down

0 comments on commit c150409

Please sign in to comment.