From eed529364ee75bf24c9972cfc650bb488dbce843 Mon Sep 17 00:00:00 2001 From: zyguan Date: Thu, 23 Jan 2025 09:39:03 +0000 Subject: [PATCH] client: refactor batch client send loop Signed-off-by: zyguan --- internal/client/client.go | 21 +++++++++++++++++++++ internal/client/client_batch.go | 32 +++++--------------------------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 73c80edadb..6af800bbd5 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -586,6 +586,27 @@ func (c *RPCClient) closeConns() { c.Unlock() } +func (c *RPCClient) recycleIdleConnArray() { + start := time.Now() + + var addrs []string + var vers []uint64 + c.RLock() + for _, conn := range c.conns { + if conn.batchConn != nil && conn.isIdle() { + addrs = append(addrs, conn.target) + vers = append(vers, conn.ver) + } + } + c.RUnlock() + + for i, addr := range addrs { + c.CloseAddrVer(addr, vers[i]) + } + + metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) +} + func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { tikvrpc.AttachContext(req, req.Context) diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 0b23d72aab..e630fca348 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -538,6 +538,10 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { a.reqBuilder.reset() headRecvTime, headArrivalInterval := a.fetchAllPendingRequests(int(cfg.MaxBatchSize)) + if a.reqBuilder.len() == 0 { + // the conn is closed or recycled. + return + } // curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil { @@ -558,13 +562,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) { } } length := a.reqBuilder.len() + avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize a.metrics.pendingRequests.Observe(float64(len(a.batchCommandsCh) + length)) - if uint(length) == 0 { - // The batch command channel is closed. - return - } else { - avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize - } a.metrics.bestBatchSize.Observe(avgBatchWaitSize) a.metrics.headArrivalInterval.Observe(headArrivalInterval.Seconds()) a.metrics.sendLoopWaitHeadDur.Observe(headRecvTime.Sub(sendLoopStartTime).Seconds()) @@ -1164,24 +1163,3 @@ func sendBatchRequest( return nil, errors.WithMessage(context.DeadlineExceeded, reason) } } - -func (c *RPCClient) recycleIdleConnArray() { - start := time.Now() - - var addrs []string - var vers []uint64 - c.RLock() - for _, conn := range c.conns { - if conn.batchConn != nil && conn.isIdle() { - addrs = append(addrs, conn.target) - vers = append(vers, conn.ver) - } - } - c.RUnlock() - - for i, addr := range addrs { - c.CloseAddrVer(addr, vers[i]) - } - - metrics.TiKVBatchClientRecycle.Observe(time.Since(start).Seconds()) -}