Skip to content

Commit

Permalink
feat:support result cache, issue: TencentBlueKing#315
Browse files Browse the repository at this point in the history
  • Loading branch information
tbs60 committed Jan 20, 2025
1 parent 65467ae commit 1d45cfd
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/backend/booster/bk_dist/common/resultcache/filemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (f *FileMgrWithARC) ticker() {
for {
select {
case <-ticker.C:
blog.Infof("FileMgrWithARC: on ticker now...")
blog.Debugf("FileMgrWithARC: on ticker now...")
// 最合理的策略,是根据最后的访问时间决定是否可以删除;
// 但当我们去取这个时间时,我们就成了最后的访问者(当观察者介入,就成了被观察世界的一部分)
// 所以先简单的以该文件放入内存中的时间为标准
Expand Down
2 changes: 1 addition & 1 deletion src/backend/booster/bk_dist/common/resultcache/indexmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (t *IndexMgrWithARC) ticker() {
for {
select {
case <-ticker.C:
blog.Infof("IndexMgrWithARC: on ticker now...")
blog.Debugf("IndexMgrWithARC: on ticker now...")
t.lock.RLock()
for _, rg := range t.Data {
if rg.LastStatus == StatusModified {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (m *Mgr) getRemoteResultCacheIndex(key string, host *protocol.Host) {
host,
record,
)
if err == nil && result != nil {
if err == nil && result != nil && len(result.ResultIndex) > 0 {
oneGroupRecord, _ := resultcache.ToRecordGroup(result.ResultIndex)
if oneGroupRecord != nil {
m.resultdata.lock.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (h *Handle4QueryResultCacheIndex) Handle(
if err != nil {
blog.Errorf("failed to encode rsp to messages for error:%v", err)
}
blog.Infof("succeed to encode send file response to messages")
blog.Infof("succeed to encode query result cache index response to messages")

// send response
if client != nil {
Expand Down
19 changes: 10 additions & 9 deletions src/backend/booster/bk_dist/worker/pkg/manager/slotmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func notifySlot() {

// 触发条件: 1. 定时 2. 有任务结束 3. 有新的客户端查询进来 4. 已分配的slot超时未使用清理
// 需要考虑各种异常情况:
// 1. 发送通知失败,则该可用槽需要回收(计数器回退),同时将该客户端从等待的池子中删除
// 2. 发送成功,但客户端这时没有任务了,则占了一个槽但没有用到,这种情况下根据超时来清理,比如分配了1分钟还没有任务过来,则清理掉,
// 同时降低该客户端的优先级
// 3. 客户端任务的端口是变化的,根据客户端的ip来对比(如果没有对比上,是否考虑直接丢弃?能否同时支持p2p和非p2p的任务?)
// 1. 发送通知失败,则该可用槽需要回收(计数器回退),同时将该客户端从等待的池子中删除
// 2. 发送成功,但客户端这时没有任务了,则占了一个槽但没有用到,这种情况下根据超时来清理,比如分配了1分钟还没有任务过来,则清理掉,
// 同时降低该客户端的优先级
// 3. 客户端任务的端口是变化的,根据客户端的ip来对比(如果没有对比上,是否考虑直接丢弃?能否同时支持p2p和非p2p的任务?)
func (o *tcpManager) slotTimer() {
blog.Infof("[slotmgr] start estimate slot timer")
tick := time.NewTicker(estimateSlotIntervalTime)
Expand Down Expand Up @@ -325,13 +325,13 @@ func (o *tcpManager) receiveSlotRspAck(client *protocol.TCPClient) (*dcProtocol.

// 检查客户端连接是否正常,如果异常了,则清理掉
func (o *tcpManager) checkClient() {
blog.Infof("[slotmgr] check all client now")
blog.Debugf("[slotmgr] check all client now")
clientLock.Lock()
defer clientLock.Unlock()

needclean := false
for i := range clientCache {
blog.Infof("[slotmgr] check client %+v", *clientCache[i])
blog.Debugf("[slotmgr] check client %+v", *clientCache[i])
if clientCache[i].tcpclient.Closed() {
needclean = true
clientCache[i].valid = false
Expand Down Expand Up @@ -361,7 +361,7 @@ func (o *tcpManager) checkClient() {
// 检查分配的slot是否超时,需要考虑客户端发送工具链的额外时间,先将超时时间设置稍微大点
// 超时的slot意味着什么? 后续是否继续为该客户端分配slot? worker端先不管,由客户端来检查和释放连接
func (o *tcpManager) checkSlot() {
blog.Infof("[slotmgr] check slot now")
blog.Debugf("[slotmgr] check slot now")

slotLock.Lock()
defer slotLock.Unlock()
Expand Down Expand Up @@ -465,8 +465,9 @@ func (o *tcpManager) onTaskReceived(ip string) {
}

// 如果相应的客户端有发送文件过来,也可以认为该slot是激活的,超时时间可以往后推
// 因为现在任务执行前需要发送依赖文件
// 只激活超时时间的起始时间,不去掉预扣的slot
//
// 因为现在任务执行前需要发送依赖文件
// 只激活超时时间的起始时间,不去掉预扣的slot
func (o *tcpManager) onFileReceived(ip string) {
blog.Infof("[slotmgr] on file received from client:%s", ip)

Expand Down

0 comments on commit 1d45cfd

Please sign in to comment.