From 1d45cfd1f4b7c6d57ceac161f834158eb43e68c2 Mon Sep 17 00:00:00 2001 From: tbs60 Date: Mon, 20 Jan 2025 10:49:26 +0800 Subject: [PATCH] feat:support result cache, issue: #315 --- .../bk_dist/common/resultcache/filemgr.go | 2 +- .../bk_dist/common/resultcache/indexmgr.go | 2 +- .../controller/pkg/manager/local/mgr.go | 2 +- .../handler4queryresultcacheindex.go | 2 +- .../bk_dist/worker/pkg/manager/slotmgr.go | 19 ++++++++++--------- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/backend/booster/bk_dist/common/resultcache/filemgr.go b/src/backend/booster/bk_dist/common/resultcache/filemgr.go index 70ef166ad..a1bee5469 100644 --- a/src/backend/booster/bk_dist/common/resultcache/filemgr.go +++ b/src/backend/booster/bk_dist/common/resultcache/filemgr.go @@ -164,7 +164,7 @@ func (f *FileMgrWithARC) ticker() { for { select { case <-ticker.C: - blog.Infof("FileMgrWithARC: on ticker now...") + blog.Debugf("FileMgrWithARC: on ticker now...") // 最合理的策略,是根据最后的访问时间决定是否可以删除; // 但当我们去取这个时间时,我们就成了最后的访问者(当观察者介入,就成了被观察世界的一部分) // 所以先简单的以该文件放入内存中的时间为标准 diff --git a/src/backend/booster/bk_dist/common/resultcache/indexmgr.go b/src/backend/booster/bk_dist/common/resultcache/indexmgr.go index 10e15e141..58a6e888f 100644 --- a/src/backend/booster/bk_dist/common/resultcache/indexmgr.go +++ b/src/backend/booster/bk_dist/common/resultcache/indexmgr.go @@ -503,7 +503,7 @@ func (t *IndexMgrWithARC) ticker() { for { select { case <-ticker.C: - blog.Infof("IndexMgrWithARC: on ticker now...") + blog.Debugf("IndexMgrWithARC: on ticker now...") t.lock.RLock() for _, rg := range t.Data { if rg.LastStatus == StatusModified { diff --git a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go index 06c62b37a..f2bb40508 100644 --- a/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go +++ b/src/backend/booster/bk_dist/controller/pkg/manager/local/mgr.go @@ -664,7 +664,7 @@ func (m *Mgr) getRemoteResultCacheIndex(key string, host *protocol.Host) { host, record, ) - if err == nil && result != nil { + if err == nil && result != nil && len(result.ResultIndex) > 0 { oneGroupRecord, _ := resultcache.ToRecordGroup(result.ResultIndex) if oneGroupRecord != nil { m.resultdata.lock.Lock() diff --git a/src/backend/booster/bk_dist/worker/pkg/cmd_handler/handler4queryresultcacheindex.go b/src/backend/booster/bk_dist/worker/pkg/cmd_handler/handler4queryresultcacheindex.go index 9f7d16dab..19489e977 100644 --- a/src/backend/booster/bk_dist/worker/pkg/cmd_handler/handler4queryresultcacheindex.go +++ b/src/backend/booster/bk_dist/worker/pkg/cmd_handler/handler4queryresultcacheindex.go @@ -93,7 +93,7 @@ func (h *Handle4QueryResultCacheIndex) Handle( if err != nil { blog.Errorf("failed to encode rsp to messages for error:%v", err) } - blog.Infof("succeed to encode send file response to messages") + blog.Infof("succeed to encode query result cache index response to messages") // send response if client != nil { diff --git a/src/backend/booster/bk_dist/worker/pkg/manager/slotmgr.go b/src/backend/booster/bk_dist/worker/pkg/manager/slotmgr.go index 2be0ec4b7..25391a0af 100644 --- a/src/backend/booster/bk_dist/worker/pkg/manager/slotmgr.go +++ b/src/backend/booster/bk_dist/worker/pkg/manager/slotmgr.go @@ -94,10 +94,10 @@ func notifySlot() { // 触发条件: 1. 定时 2. 有任务结束 3. 有新的客户端查询进来 4. 已分配的slot超时未使用清理 // 需要考虑各种异常情况: -// 1. 发送通知失败,则该可用槽需要回收(计数器回退),同时将该客户端从等待的池子中删除 -// 2. 发送成功,但客户端这时没有任务了,则占了一个槽但没有用到,这种情况下根据超时来清理,比如分配了1分钟还没有任务过来,则清理掉, -// 同时降低该客户端的优先级 -// 3. 客户端任务的端口是变化的,根据客户端的ip来对比(如果没有对比上,是否考虑直接丢弃?能否同时支持p2p和非p2p的任务?) +// 1. 发送通知失败,则该可用槽需要回收(计数器回退),同时将该客户端从等待的池子中删除 +// 2. 发送成功,但客户端这时没有任务了,则占了一个槽但没有用到,这种情况下根据超时来清理,比如分配了1分钟还没有任务过来,则清理掉, +// 同时降低该客户端的优先级 +// 3. 客户端任务的端口是变化的,根据客户端的ip来对比(如果没有对比上,是否考虑直接丢弃?能否同时支持p2p和非p2p的任务?) func (o *tcpManager) slotTimer() { blog.Infof("[slotmgr] start estimate slot timer") tick := time.NewTicker(estimateSlotIntervalTime) @@ -325,13 +325,13 @@ func (o *tcpManager) receiveSlotRspAck(client *protocol.TCPClient) (*dcProtocol. // 检查客户端连接是否正常,如果异常了,则清理掉 func (o *tcpManager) checkClient() { - blog.Infof("[slotmgr] check all client now") + blog.Debugf("[slotmgr] check all client now") clientLock.Lock() defer clientLock.Unlock() needclean := false for i := range clientCache { - blog.Infof("[slotmgr] check client %+v", *clientCache[i]) + blog.Debugf("[slotmgr] check client %+v", *clientCache[i]) if clientCache[i].tcpclient.Closed() { needclean = true clientCache[i].valid = false @@ -361,7 +361,7 @@ func (o *tcpManager) checkClient() { // 检查分配的slot是否超时,需要考虑客户端发送工具链的额外时间,先将超时时间设置稍微大点 // 超时的slot意味着什么? 后续是否继续为该客户端分配slot? worker端先不管,由客户端来检查和释放连接 func (o *tcpManager) checkSlot() { - blog.Infof("[slotmgr] check slot now") + blog.Debugf("[slotmgr] check slot now") slotLock.Lock() defer slotLock.Unlock() @@ -465,8 +465,9 @@ func (o *tcpManager) onTaskReceived(ip string) { } // 如果相应的客户端有发送文件过来,也可以认为该slot是激活的,超时时间可以往后推 -// 因为现在任务执行前需要发送依赖文件 -// 只激活超时时间的起始时间,不去掉预扣的slot +// +// 因为现在任务执行前需要发送依赖文件 +// 只激活超时时间的起始时间,不去掉预扣的slot func (o *tcpManager) onFileReceived(ip string) { blog.Infof("[slotmgr] on file received from client:%s", ip)