From 75892ad8ee71e0a44b715b76b75dc943fc61f8ae Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 4 Sep 2024 22:37:43 +0800 Subject: [PATCH 01/21] feat: periodic updates of min_commit_ts and broadcast Signed-off-by: ekexium --- go.mod | 2 + go.sum | 4 +- tikvrpc/tikvrpc.go | 11 ++ txnkv/transaction/2pc.go | 178 ++++++++++++++++++++++++++++++-- txnkv/transaction/prewrite.go | 8 +- txnkv/transaction/test_probe.go | 6 +- 6 files changed, 191 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index f8bbc87b39..1bdb91c30f 100644 --- a/go.mod +++ b/go.mod @@ -59,3 +59,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2 diff --git a/go.sum b/go.sum index 74adbea16c..267e189f87 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2 h1:ugZxWz4IuaQuXV0V/xarYEuFEzhwOPLutp5q3/RIl60= +github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -74,8 +76,6 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 350c5b7c78..5d66c94e6e 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -100,6 +100,7 @@ const ( CmdLockWaitInfo CmdGetHealthFeedback + CmdBroadcastTxnStatus CmdCop CmdType = 512 + iota CmdCopStream @@ -221,6 +222,8 @@ func (t CmdType) String() string { return "LockWaitInfo" case CmdGetHealthFeedback: return "GetHealthFeedback" + case CmdBroadcastTxnStatus: + return "BroadcastTxnStatus" case CmdFlashbackToVersion: return "FlashbackToVersion" case CmdPrepareFlashbackToVersion: @@ -568,6 +571,10 @@ func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest { return req.Req.(*kvrpcpb.GetHealthFeedbackRequest) } +func (req *Request) BroadcastTxnStatus() *kvrpcpb.BroadcastTxnStatusRequest { + return req.Req.(*kvrpcpb.BroadcastTxnStatusRequest) +} + // FlashbackToVersion returns FlashbackToVersionRequest in request. func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest { return req.Req.(*kvrpcpb.FlashbackToVersionRequest) @@ -653,6 +660,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}} case CmdGetHealthFeedback: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}} + case CmdBroadcastTxnStatus: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BroadcastTxnStatus{BroadcastTxnStatus: req.BroadcastTxnStatus()}} } return nil } @@ -1143,6 +1152,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet()) case CmdGetHealthFeedback: resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback()) + case CmdBroadcastTxnStatus: + resp.Resp, err = client.BroadcastTxnStatus(ctx, req.BroadcastTxnStatus()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index a772a0e8e7..476f09b3ab 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -164,7 +164,7 @@ type twoPhaseCommitter struct { } useAsyncCommit uint32 - minCommitTS uint64 + minCommitTS *minCommitTsManager maxCommitTS uint64 prewriteStarted bool prewriteCancelled uint32 @@ -477,6 +477,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err binlog: txn.binlog, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, resourceGroupName: txn.resourceGroupName, + minCommitTS: newMinCommitTsManager(), } return committer, nil } @@ -1140,6 +1141,78 @@ const ( stateClosed ) +// WriteAccessLevel represents the level of write access required to modify the value +type WriteAccessLevel int + +const ( + TTLAccess WriteAccessLevel = 1 + TwoPCAccess WriteAccessLevel = 2 +) + +// minCommitTsManager manages a minimum commit timestamp with different write access levels. +type minCommitTsManager struct { + mutex sync.Mutex + value uint64 + requiredWriteAccess WriteAccessLevel +} + +// newMinCommitTsManager creates and returns a new minCommitTsManager. +func newMinCommitTsManager() *minCommitTsManager { + return &minCommitTsManager{requiredWriteAccess: TTLAccess} +} + +// tryUpdate update the value if the provided write access level is sufficient and +// the new value is greater. +func (m *minCommitTsManager) tryUpdate(newValue uint64, writeAccess WriteAccessLevel) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if writeAccess < m.requiredWriteAccess { + return + } + + if newValue > m.value { + m.value = newValue + } +} + +// elevateWriteAccess elevates the required write access level. +// It returns the current value. +func (m *minCommitTsManager) elevateWriteAccess(newLevel WriteAccessLevel) uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + + if newLevel > m.requiredWriteAccess { + m.requiredWriteAccess = newLevel + } + return m.value +} + +// resetWriteAccess resets the required write access level to TTLAccess. +func (m *minCommitTsManager) resetWriteAccess() { + m.mutex.Lock() + defer m.mutex.Unlock() + + m.requiredWriteAccess = TTLAccess +} + +// get returns the current value. This is a read operation and doesn't require write access. +func (m *minCommitTsManager) get() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.value +} + +// getRequiredWriteAccess returns the current required write access level. +func (m *minCommitTsManager) getRequiredWriteAccess() WriteAccessLevel { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.requiredWriteAccess +} + + type ttlManager struct { state ttlManagerState ch chan struct{} @@ -1178,7 +1251,10 @@ func (tm *ttlManager) reset() { const keepAliveMaxBackoff = 20000 const pessimisticLockMaxBackoff = 20000 const maxConsecutiveFailure = 10 +const broadcastMaxBackoff = 10000 +// keepAlive keeps sending heartbeat to update the primary key's TTL +// For pipelined transactions, it also updates min_commit_ts, and broadcasts it to all TiKVs. func keepAlive( c *twoPhaseCommitter, closeCh chan struct{}, tm *ttlManager, primaryKey []byte, lockCtx *kv.LockCtx, isPipelinedTxn bool, @@ -1237,6 +1313,11 @@ func keepAlive( return } + // Update minCommitTS + if isPipelinedTxn && c.minCommitTS.requiredWriteAccess <= TTLAccess { + c.minCommitTS.tryUpdate(now, TTLAccess) + } + newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL) logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), @@ -1244,7 +1325,9 @@ func keepAlive( zap.Bool("isPipelinedTxn", isPipelinedTxn), ) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL) + _, stopHeartBeat, err := sendTxnHeartBeat( + bo, c.store, primaryKey, c.startTS, newTTL, c.minCommitTS.get(), + ) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) @@ -1268,19 +1351,95 @@ func keepAlive( } return } - continue + } else { + keepFail = 0 + metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) + } + + // broadcast to all stores + if isPipelinedTxn { + broadcastToAllStores( + c.store, + c.store.GetRegionCache().GetStoresByType(tikvrpc.TiKV), + retry.NewBackofferWithVars( + context.Background(), + broadcastMaxBackoff, + c.txn.vars, + ), + c.startTS, + c.minCommitTS.get(), + ) } - keepFail = 0 - metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) } } } -func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { +const broadcastRpcTimeout = time.Millisecond * 500 +const broadcastMaxConcurrency = 10 + +// broadcasts to all stores to update the minCommitTS. Ignore errors. +func broadcastToAllStores(store kvstore, stores []*locate.Store, bo *retry.Backoffer, startTs, + minCommitTs uint64) { + status := kvrpcpb.TxnStatus{ + StartTs: startTs, + MinCommitTs: minCommitTs, + CommitTs: 0, + RolledBack: false, + } + req := tikvrpc.NewRequest( + tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ + TxnStatus: []*kvrpcpb.TxnStatus{&status}, + }, + ) + + var wg sync.WaitGroup + errChan := make(chan error, len(stores)) + semaphore := make(chan struct{}, broadcastMaxConcurrency) + + for _, s := range stores { + wg.Add(1) + go func(addr string) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + _, err := store.GetTiKVClient().SendRequest( + bo.GetCtx(), + addr, + req, + broadcastRpcTimeout, + ) + if err != nil { + errChan <- err + } + }(s.GetAddr()) + } + + wg.Wait() + close(errChan) + + for err := range errChan { + logutil.Logger(bo.GetCtx()).Info( + "broadcast txn status failed", + zap.Uint64("startTs", startTs), + zap.Uint64("minCommitTs", minCommitTs), + zap.Error(err), + ) + } +} + +func sendTxnHeartBeat( + bo *retry.Backoffer, + store kvstore, + primary []byte, + startTS, ttl uint64, + minCommitTS uint64, +) (newTTL uint64, stopHeartBeat bool, err error) { req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{ PrimaryLock: primary, StartVersion: startTS, AdviseLockTtl: ttl, + MinCommitTs: minCommitTS, }) for { loc, err := store.GetRegionCache().LocateKey(bo, primary) @@ -1449,6 +1608,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { + c.minCommitTS.elevateWriteAccess(TwoPCAccess) var binlogSkipped bool defer func() { if c.isOnePC() { @@ -1552,7 +1712,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } commitDetail.GetLatestTsTime = time.Since(start) // Plus 1 to avoid producing the same commit TS with previously committed transactions - c.minCommitTS = latestTS + 1 + c.minCommitTS.tryUpdate(latestTS+1, TwoPCAccess) } // Calculate maxCommitTS if necessary if commitTSMayBeCalculated { @@ -1671,10 +1831,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } if c.isAsyncCommit() { - if c.minCommitTS == 0 { + if c.minCommitTS.get() == 0 { return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } - commitTS = c.minCommitTS + commitTS = c.minCommitTS.get() } else { start = time.Now() logutil.Event(ctx, "start get commit ts") diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index a50e407198..1238d1fd63 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } } c.mu.Lock() - minCommitTS := c.minCommitTS + minCommitTS := c.minCommitTS.get() c.mu.Unlock() if c.forUpdateTS > 0 && c.forUpdateTS >= minCommitTS { minCommitTS = c.forUpdateTS + 1 @@ -387,7 +387,7 @@ func (action actionPrewrite) handleSingleBatch( c.setOnePC(false) c.setAsyncCommit(false) } else { - // For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe + // For 1PC, there's no racing to access `onePCCommitTS` so it's safe // not to lock the mutex. if c.onePCCommitTS != 0 { logutil.Logger(bo.GetCtx()).Fatal( @@ -419,8 +419,8 @@ func (action actionPrewrite) handleSingleBatch( c.setAsyncCommit(false) } else { c.mu.Lock() - if prewriteResp.MinCommitTs > c.minCommitTS { - c.minCommitTS = prewriteResp.MinCommitTs + if prewriteResp.MinCommitTs > c.minCommitTS.get() { + c.minCommitTS.tryUpdate(prewriteResp.MinCommitTs, TwoPCAccess) } c.mu.Unlock() } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 713502db41..e040bb6301 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -194,12 +194,12 @@ func (c CommitterProbe) GetCommitTS() uint64 { // GetMinCommitTS returns the minimal commit ts can be used. func (c CommitterProbe) GetMinCommitTS() uint64 { - return c.minCommitTS + return c.minCommitTS.get() } // SetMinCommitTS sets the minimal commit ts can be used. func (c CommitterProbe) SetMinCommitTS(ts uint64) { - c.minCommitTS = ts + c.minCommitTS.tryUpdate(ts, TwoPCAccess) } // SetMaxCommitTS sets the max commit ts can be used. @@ -376,7 +376,7 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by // SendTxnHeartBeat renews a txn's ttl. func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { - return sendTxnHeartBeat(bo, store, primary, startTS, ttl) + return sendTxnHeartBeat(bo, store, primary, startTS, ttl, 0) } // ConfigProbe exposes configurations and global variables for testing purpose. From 5588fdb4699174be3a176f7641c22fb61098ed39 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 5 Sep 2024 16:09:48 +0800 Subject: [PATCH 02/21] feat: periodic updates of full store list Signed-off-by: ekexium --- internal/locate/region_cache.go | 38 +++++++++++++++++++++++++++++++++ tikv/gc.go | 4 ++-- txnkv/transaction/2pc.go | 7 +++--- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ef575f8347..36d13cbfa0 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -726,9 +726,46 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { // cache GC is incompatible with cache refresh c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval) } + c.bg.schedule( + func(ctx context.Context, _ time.Time) bool { + refreshFullStoreList(ctx, c.PDClient(), c.stores) + return false + }, refreshStoreListInterval, + ) return c } +// Try to refresh full store list. Errors are ignored. +func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeCache) { + storeList, err := pdClient.GetAllStores(ctx) + if err != nil { + logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err)) + return + } + for _, store := range storeList { + s, exist := stores.get(store.GetId()) + if exist { + continue + } + s = stores.getOrInsertDefault(store.GetId()) + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + s.setResolveState(tombstone) + return + } + addr := store.GetAddress() + if addr == "" { + return + } + // TODO: maybe refactor this, together with other places initializing Store + s.addr = addr + s.peerAddr = store.GetPeerAddress() + s.saddr = store.GetStatusAddress() + s.storeType = tikvrpc.GetStoreTypeByMeta(store) + s.labels = store.GetLabels() + s.changeResolveStateTo(unresolved, resolved) + } +} + // only used fot test. func newTestRegionCache() *RegionCache { c := &RegionCache{} @@ -2649,6 +2686,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV const cleanCacheInterval = time.Second const cleanRegionNumPerRound = 50 +const refreshStoreListInterval = time.Second // gcScanItemHook is only used for testing var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)]) diff --git a/tikv/gc.go b/tikv/gc.go index a3b4eefcea..da0022f33b 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -309,7 +309,7 @@ const unsafeDestroyRangeTimeout = 5 * time.Minute // multiple times on an single range. func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKey []byte) error { // Get all stores every time deleting a region. So the store list is less probably to be stale. - stores, err := s.listStoresForUnsafeDestory(ctx) + stores, err := s.listStoresForUnsafeDestroy(ctx) if err != nil { metrics.TiKVUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("get_stores").Inc() return err @@ -366,7 +366,7 @@ func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKe return nil } -func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Store, error) { +func (s *KVStore) listStoresForUnsafeDestroy(ctx context.Context) ([]*metapb.Store, error) { stores, err := s.pdClient.GetAllStores(ctx) if err != nil { return nil, errors.WithStack(err) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 476f09b3ab..6e4902ddc9 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -164,7 +164,7 @@ type twoPhaseCommitter struct { } useAsyncCommit uint32 - minCommitTS *minCommitTsManager + minCommitTS *minCommitTsManager maxCommitTS uint64 prewriteStarted bool prewriteCancelled uint32 @@ -477,7 +477,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err binlog: txn.binlog, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, resourceGroupName: txn.resourceGroupName, - minCommitTS: newMinCommitTsManager(), + minCommitTS: newMinCommitTsManager(), } return committer, nil } @@ -1212,7 +1212,6 @@ func (m *minCommitTsManager) getRequiredWriteAccess() WriteAccessLevel { return m.requiredWriteAccess } - type ttlManager struct { state ttlManagerState ch chan struct{} @@ -1439,7 +1438,7 @@ func sendTxnHeartBeat( PrimaryLock: primary, StartVersion: startTS, AdviseLockTtl: ttl, - MinCommitTs: minCommitTS, + MinCommitTs: minCommitTS, }) for { loc, err := store.GetRegionCache().LocateKey(bo, primary) From 294e6145128363ac4c024fadb235f77f184823d3 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 5 Sep 2024 19:00:10 +0800 Subject: [PATCH 03/21] test: unit test for minCommitTsManager Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 22 +++---- txnkv/transaction/2pc_test.go | 104 ++++++++++++++++++++++++++++++++ txnkv/transaction/prewrite.go | 2 +- txnkv/transaction/test_probe.go | 2 +- 4 files changed, 113 insertions(+), 17 deletions(-) create mode 100644 txnkv/transaction/2pc_test.go diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 6e4902ddc9..c5ec473eb8 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1145,8 +1145,8 @@ const ( type WriteAccessLevel int const ( - TTLAccess WriteAccessLevel = 1 - TwoPCAccess WriteAccessLevel = 2 + ttlAccess WriteAccessLevel = 1 + twoPCAccess WriteAccessLevel = 2 ) // minCommitTsManager manages a minimum commit timestamp with different write access levels. @@ -1158,7 +1158,7 @@ type minCommitTsManager struct { // newMinCommitTsManager creates and returns a new minCommitTsManager. func newMinCommitTsManager() *minCommitTsManager { - return &minCommitTsManager{requiredWriteAccess: TTLAccess} + return &minCommitTsManager{requiredWriteAccess: ttlAccess} } // tryUpdate update the value if the provided write access level is sufficient and @@ -1188,14 +1188,6 @@ func (m *minCommitTsManager) elevateWriteAccess(newLevel WriteAccessLevel) uint6 return m.value } -// resetWriteAccess resets the required write access level to TTLAccess. -func (m *minCommitTsManager) resetWriteAccess() { - m.mutex.Lock() - defer m.mutex.Unlock() - - m.requiredWriteAccess = TTLAccess -} - // get returns the current value. This is a read operation and doesn't require write access. func (m *minCommitTsManager) get() uint64 { m.mutex.Lock() @@ -1313,8 +1305,8 @@ func keepAlive( } // Update minCommitTS - if isPipelinedTxn && c.minCommitTS.requiredWriteAccess <= TTLAccess { - c.minCommitTS.tryUpdate(now, TTLAccess) + if isPipelinedTxn && c.minCommitTS.getRequiredWriteAccess() <= ttlAccess { + c.minCommitTS.tryUpdate(now, ttlAccess) } newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL) @@ -1607,7 +1599,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { - c.minCommitTS.elevateWriteAccess(TwoPCAccess) + c.minCommitTS.elevateWriteAccess(twoPCAccess) var binlogSkipped bool defer func() { if c.isOnePC() { @@ -1711,7 +1703,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } commitDetail.GetLatestTsTime = time.Since(start) // Plus 1 to avoid producing the same commit TS with previously committed transactions - c.minCommitTS.tryUpdate(latestTS+1, TwoPCAccess) + c.minCommitTS.tryUpdate(latestTS+1, twoPCAccess) } // Calculate maxCommitTS if necessary if commitTSMayBeCalculated { diff --git a/txnkv/transaction/2pc_test.go b/txnkv/transaction/2pc_test.go new file mode 100644 index 0000000000..a7b901b3a8 --- /dev/null +++ b/txnkv/transaction/2pc_test.go @@ -0,0 +1,104 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: The code in this file is based on code from the +// TiDB project, licensed under the Apache License v 2.0 +// +// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/snapshot_test.go +// + +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transaction + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + + +func TestMinCommitTsManager(t *testing.T) { + t.Run("Initial state", func(t *testing.T) { + manager := newMinCommitTsManager() + assert.Equal(t, uint64(0), manager.get(), "Initial value should be 0") + assert.Equal(t, ttlAccess, manager.getRequiredWriteAccess(), "Initial write access should be ttlAccess") + }) + + t.Run("TTL updates", func(t *testing.T) { + manager := newMinCommitTsManager() + + manager.tryUpdate(10, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should be 10") + + manager.tryUpdate(5, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should remain 10") + }) + + t.Run("Elevate write access", func(t *testing.T) { + manager := newMinCommitTsManager() + manager.tryUpdate(10, ttlAccess) + + currentValue := manager.elevateWriteAccess(twoPCAccess) + assert.Equal(t, uint64(10), currentValue, "Current value should be 10") + assert.Equal(t, twoPCAccess, manager.getRequiredWriteAccess(), "Required write access should be twoPCAccess") + }) + + t.Run("Updates after elevation", func(t *testing.T) { + manager := newMinCommitTsManager() + manager.tryUpdate(10, ttlAccess) + manager.elevateWriteAccess(twoPCAccess) + + manager.tryUpdate(20, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should remain 10") + + manager.tryUpdate(30, twoPCAccess) + assert.Equal(t, uint64(30), manager.get(), "Value should be 30") + }) + + t.Run("Concurrent updates", func(t *testing.T) { + manager := newMinCommitTsManager() + done := make(chan bool) + + go func() { + for i := 0; i < 1000; i++ { + manager.tryUpdate(uint64(i), ttlAccess) + } + done <- true + }() + + go func() { + for i := 0; i < 1000; i++ { + manager.tryUpdate(uint64(1000+i), ttlAccess) + } + done <- true + }() + + <-done + <-done + + assert.Equal(t, manager.get(), uint64(1999),) + }) +} \ No newline at end of file diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 1238d1fd63..2b06703ce3 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -420,7 +420,7 @@ func (action actionPrewrite) handleSingleBatch( } else { c.mu.Lock() if prewriteResp.MinCommitTs > c.minCommitTS.get() { - c.minCommitTS.tryUpdate(prewriteResp.MinCommitTs, TwoPCAccess) + c.minCommitTS.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) } c.mu.Unlock() } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index e040bb6301..1bcb2791a3 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -199,7 +199,7 @@ func (c CommitterProbe) GetMinCommitTS() uint64 { // SetMinCommitTS sets the minimal commit ts can be used. func (c CommitterProbe) SetMinCommitTS(ts uint64) { - c.minCommitTS.tryUpdate(ts, TwoPCAccess) + c.minCommitTS.tryUpdate(ts, twoPCAccess) } // SetMaxCommitTS sets the max commit ts can be used. From 516aa224caf2e3f06639080509e18869ddb47975 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 6 Sep 2024 00:23:33 +0800 Subject: [PATCH 04/21] fix: set cluster id for broadcast requests Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 1 + 1 file changed, 1 insertion(+) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index c5ec473eb8..cc9dd50db9 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1382,6 +1382,7 @@ func broadcastToAllStores(store kvstore, stores []*locate.Store, bo *retry.Backo TxnStatus: []*kvrpcpb.TxnStatus{&status}, }, ) + req.Context.ClusterId = store.GetClusterID() var wg sync.WaitGroup errChan := make(chan error, len(stores)) From fba423c1b0619e61314b3b4ced620441381b50d2 Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 6 Sep 2024 12:16:39 +0800 Subject: [PATCH 05/21] fix: set resource group related context Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index cc9dd50db9..77ceffc634 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1359,6 +1359,8 @@ func keepAlive( ), c.startTS, c.minCommitTS.get(), + c.resourceGroupName, + c.resourceGroupTag, ) } } @@ -1369,8 +1371,15 @@ const broadcastRpcTimeout = time.Millisecond * 500 const broadcastMaxConcurrency = 10 // broadcasts to all stores to update the minCommitTS. Ignore errors. -func broadcastToAllStores(store kvstore, stores []*locate.Store, bo *retry.Backoffer, startTs, - minCommitTs uint64) { +func broadcastToAllStores( + store kvstore, + stores []*locate.Store, + bo *retry.Backoffer, + startTs uint64, + minCommitTs uint64, + resourceGroupName string, + resourceGroupTag []byte, +) { status := kvrpcpb.TxnStatus{ StartTs: startTs, MinCommitTs: minCommitTs, @@ -1383,6 +1392,10 @@ func broadcastToAllStores(store kvstore, stores []*locate.Store, bo *retry.Backo }, ) req.Context.ClusterId = store.GetClusterID() + req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ + ResourceGroupName: resourceGroupName, + } + req.Context.ResourceGroupTag = resourceGroupTag var wg sync.WaitGroup errChan := make(chan error, len(stores)) From 5adc6c937d43e774303a6ca2bed7af75418bf02b Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 11 Sep 2024 12:21:06 +0800 Subject: [PATCH 06/21] limit the update to non-async-commit and non-1pc txns Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 77ceffc634..9034fb6370 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1304,8 +1304,11 @@ func keepAlive( return } - // Update minCommitTS - if isPipelinedTxn && c.minCommitTS.getRequiredWriteAccess() <= ttlAccess { + // update minCommitTS, if it's a non-async-commit pipelined transaction + if isPipelinedTxn && + !c.isOnePC() && + !c.isAsyncCommit() && + c.minCommitTS.getRequiredWriteAccess() <= ttlAccess { c.minCommitTS.tryUpdate(now, ttlAccess) } From 61545b03211d71e11daea8d9faf220bf1b5fd2c7 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 18 Sep 2024 19:07:43 +0800 Subject: [PATCH 07/21] broadcast txn status when commit and rollback Signed-off-by: ekexium --- go.mod | 2 +- go.sum | 4 +-- txnkv/transaction/2pc.go | 28 +++++++-------- txnkv/transaction/2pc_test.go | 5 ++- txnkv/transaction/pipelined_flush.go | 52 ++++++++++++++++++++++++++-- txnkv/transaction/txn.go | 20 ++++++++++- 6 files changed, 86 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 1bdb91c30f..96f3d30010 100644 --- a/go.mod +++ b/go.mod @@ -60,4 +60,4 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2 +replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37 diff --git a/go.sum b/go.sum index 267e189f87..71cd91e747 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2 h1:ugZxWz4IuaQuXV0V/xarYEuFEzhwOPLutp5q3/RIl60= -github.com/ekexium/kvproto v0.0.0-20240904135756-b4bbca7aeec2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37 h1:wwoMiZm/cK9466IX3QUPgogxH+HsTuAVAouHLlBJLh0= +github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 9034fb6370..fd10e6064d 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1242,6 +1242,7 @@ func (tm *ttlManager) reset() { const keepAliveMaxBackoff = 20000 const pessimisticLockMaxBackoff = 20000 const maxConsecutiveFailure = 10 +const broadcastGracePeriod = 5 * time.Second const broadcastMaxBackoff = 10000 // keepAlive keeps sending heartbeat to update the primary key's TTL @@ -1354,14 +1355,18 @@ func keepAlive( if isPipelinedTxn { broadcastToAllStores( c.store, - c.store.GetRegionCache().GetStoresByType(tikvrpc.TiKV), retry.NewBackofferWithVars( context.Background(), broadcastMaxBackoff, c.txn.vars, ), - c.startTS, - c.minCommitTS.get(), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: c.minCommitTS.get(), + CommitTs: 0, + RolledBack: false, + IsCompleted: false, + }, c.resourceGroupName, c.resourceGroupTag, ) @@ -1376,22 +1381,15 @@ const broadcastMaxConcurrency = 10 // broadcasts to all stores to update the minCommitTS. Ignore errors. func broadcastToAllStores( store kvstore, - stores []*locate.Store, bo *retry.Backoffer, - startTs uint64, - minCommitTs uint64, + status *kvrpcpb.TxnStatus, resourceGroupName string, resourceGroupTag []byte, ) { - status := kvrpcpb.TxnStatus{ - StartTs: startTs, - MinCommitTs: minCommitTs, - CommitTs: 0, - RolledBack: false, - } + stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV) req := tikvrpc.NewRequest( tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ - TxnStatus: []*kvrpcpb.TxnStatus{&status}, + TxnStatus: []*kvrpcpb.TxnStatus{status}, }, ) req.Context.ClusterId = store.GetClusterID() @@ -1429,8 +1427,7 @@ func broadcastToAllStores( for err := range errChan { logutil.Logger(bo.GetCtx()).Info( "broadcast txn status failed", - zap.Uint64("startTs", startTs), - zap.Uint64("minCommitTs", minCommitTs), + zap.Stringer("status", status), zap.Error(err), ) } @@ -1595,6 +1592,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { var err error if c.txn.IsPipelined() { // TODO: cleanup pipelined txn + // TODO: broadcast txn status } else if !c.isOnePC() { err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) } else if c.isPessimistic { diff --git a/txnkv/transaction/2pc_test.go b/txnkv/transaction/2pc_test.go index a7b901b3a8..3d614bc569 100644 --- a/txnkv/transaction/2pc_test.go +++ b/txnkv/transaction/2pc_test.go @@ -39,7 +39,6 @@ import ( "testing" ) - func TestMinCommitTsManager(t *testing.T) { t.Run("Initial state", func(t *testing.T) { manager := newMinCommitTsManager() @@ -99,6 +98,6 @@ func TestMinCommitTsManager(t *testing.T) { <-done <-done - assert.Equal(t, manager.get(), uint64(1999),) + assert.Equal(t, manager.get(), uint64(1999)) }) -} \ No newline at end of file +} diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 3ba39d279c..ac449f8250 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -338,6 +338,23 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", commitTS), ) + broadcastToAllStores( + c.store, + retry.NewBackofferWithVars( + bo.GetCtx(), + broadcastMaxBackoff, + c.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: c.minCommitTS.get(), + CommitTs: commitTS, + RolledBack: false, + IsCompleted: false, + }, + c.resourceGroupName, + c.resourceGroupTag, + ) if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil { return nil @@ -439,13 +456,19 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(commit, &resolved) + var commitTs uint64 + if commit { + commitTs = atomic.LoadUint64(&c.commitTS) + } else { + commitTs = 0 + } if err != nil { logutil.Logger(bo.GetCtx()).Error( "[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), ) return } @@ -470,7 +493,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.String("txn-status", status), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), zap.Uint64("session", c.sessionID), zap.Error(err), ) @@ -479,9 +502,32 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.String("txn-status", status), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), zap.Uint64("session", c.sessionID), ) + + // wait a while before notifying txn_status_cache to evict the txn, + // which tolerates slow followers and avoids the situation that the + // txn is evicted before the follower catches up. + time.Sleep(broadcastGracePeriod) + + broadcastToAllStores( + c.store, + retry.NewBackofferWithVars( + bo.GetCtx(), + broadcastMaxBackoff, + c.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: 0, + CommitTs: commitTs, + RolledBack: !commit, + IsCompleted: true, + }, + c.resourceGroupName, + c.resourceGroupTag, + ) } }() } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 674a242a1b..d32cc19749 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -795,7 +795,25 @@ func (txn *KVTxn) Rollback() error { txn.committer.ttlManager.close() // no need to clean up locks when no flush triggered. pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd - if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 { + needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 + broadcastToAllStores( + txn.committer.store, + retry.NewBackofferWithVars( + txn.store.Ctx(), + broadcastMaxBackoff, + txn.committer.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: txn.startTS, + MinCommitTs: txn.committer.minCommitTS.get(), + CommitTs: 0, + RolledBack: true, + IsCompleted: !needCleanUpLocks, + }, + txn.resourceGroupName, + txn.resourceGroupTag, + ) + if needCleanUpLocks { rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars) txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false) } From f3909ead431ff7a68628b00cadc70a089e3bbf7e Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 19 Sep 2024 14:58:43 +0800 Subject: [PATCH 08/21] rename minCommiTS to minCommitTSMgr; refactor broadcastToAllStores Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 60 +++++++++++++++------------- txnkv/transaction/pipelined_flush.go | 2 +- txnkv/transaction/prewrite.go | 8 ++-- txnkv/transaction/test_probe.go | 4 +- txnkv/transaction/txn.go | 2 +- 5 files changed, 40 insertions(+), 36 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index fd10e6064d..d64ad02945 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -164,7 +164,7 @@ type twoPhaseCommitter struct { } useAsyncCommit uint32 - minCommitTS *minCommitTsManager + minCommitTSMgr *minCommitTsManager maxCommitTS uint64 prewriteStarted bool prewriteCancelled uint32 @@ -477,7 +477,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err binlog: txn.binlog, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, resourceGroupName: txn.resourceGroupName, - minCommitTS: newMinCommitTsManager(), + minCommitTSMgr: newMinCommitTsManager(), } return committer, nil } @@ -1305,12 +1305,12 @@ func keepAlive( return } - // update minCommitTS, if it's a non-async-commit pipelined transaction + // update minCommitTSMgr, if it's a non-async-commit pipelined transaction if isPipelinedTxn && !c.isOnePC() && !c.isAsyncCommit() && - c.minCommitTS.getRequiredWriteAccess() <= ttlAccess { - c.minCommitTS.tryUpdate(now, ttlAccess) + c.minCommitTSMgr.getRequiredWriteAccess() <= ttlAccess { + c.minCommitTSMgr.tryUpdate(now, ttlAccess) } newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL) @@ -1321,7 +1321,7 @@ func keepAlive( ) startTime := time.Now() _, stopHeartBeat, err := sendTxnHeartBeat( - bo, c.store, primaryKey, c.startTS, newTTL, c.minCommitTS.get(), + bo, c.store, primaryKey, c.startTS, newTTL, c.minCommitTSMgr.get(), ) if err != nil { keepFail++ @@ -1362,7 +1362,7 @@ func keepAlive( ), &kvrpcpb.TxnStatus{ StartTs: c.startTS, - MinCommitTs: c.minCommitTS.get(), + MinCommitTs: c.minCommitTSMgr.get(), CommitTs: 0, RolledBack: false, IsCompleted: false, @@ -1378,7 +1378,7 @@ func keepAlive( const broadcastRpcTimeout = time.Millisecond * 500 const broadcastMaxConcurrency = 10 -// broadcasts to all stores to update the minCommitTS. Ignore errors. +// broadcasts to all stores to update the minCommitTSMgr. Ignore errors. func broadcastToAllStores( store kvstore, bo *retry.Backoffer, @@ -1400,27 +1400,31 @@ func broadcastToAllStores( var wg sync.WaitGroup errChan := make(chan error, len(stores)) - semaphore := make(chan struct{}, broadcastMaxConcurrency) + taskChan := make(chan *locate.Store, len(stores)) - for _, s := range stores { + for i := 0; i < broadcastMaxConcurrency; i++ { wg.Add(1) - go func(addr string) { + go func() { defer wg.Done() - semaphore <- struct{}{} - defer func() { <-semaphore }() - - _, err := store.GetTiKVClient().SendRequest( - bo.GetCtx(), - addr, - req, - broadcastRpcTimeout, - ) - if err != nil { - errChan <- err + for s := range taskChan { + _, err := store.GetTiKVClient().SendRequest( + bo.GetCtx(), + s.GetAddr(), + req, + broadcastRpcTimeout, + ) + if err != nil { + errChan <- err + } } - }(s.GetAddr()) + }() + } + + for _, s := range stores { + taskChan <- s } + close(taskChan) wg.Wait() close(errChan) @@ -1614,7 +1618,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { - c.minCommitTS.elevateWriteAccess(twoPCAccess) + c.minCommitTSMgr.elevateWriteAccess(twoPCAccess) var binlogSkipped bool defer func() { if c.isOnePC() { @@ -1718,7 +1722,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } commitDetail.GetLatestTsTime = time.Since(start) // Plus 1 to avoid producing the same commit TS with previously committed transactions - c.minCommitTS.tryUpdate(latestTS+1, twoPCAccess) + c.minCommitTSMgr.tryUpdate(latestTS+1, twoPCAccess) } // Calculate maxCommitTS if necessary if commitTSMayBeCalculated { @@ -1837,10 +1841,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } if c.isAsyncCommit() { - if c.minCommitTS.get() == 0 { - return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) + if c.minCommitTSMgr.get() == 0 { + return errors.Errorf("session %d invalid minCommitTSMgr for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } - commitTS = c.minCommitTS.get() + commitTS = c.minCommitTSMgr.get() } else { start = time.Now() logutil.Event(ctx, "start get commit ts") diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index ac449f8250..945f959eed 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -347,7 +347,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { ), &kvrpcpb.TxnStatus{ StartTs: c.startTS, - MinCommitTs: c.minCommitTS.get(), + MinCommitTs: c.minCommitTSMgr.get(), CommitTs: commitTS, RolledBack: false, IsCompleted: false, diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 2b06703ce3..f8edfb0233 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } } c.mu.Lock() - minCommitTS := c.minCommitTS.get() + minCommitTS := c.minCommitTSMgr.get() c.mu.Unlock() if c.forUpdateTS > 0 && c.forUpdateTS >= minCommitTS { minCommitTS = c.forUpdateTS + 1 @@ -413,14 +413,14 @@ func (action actionPrewrite) handleSingleBatch( return nil } logutil.Logger(bo.GetCtx()).Warn( - "async commit cannot proceed since the returned minCommitTS is zero, "+ + "async commit cannot proceed since the returned minCommitTSMgr is zero, "+ "fallback to normal path", zap.Uint64("startTS", c.startTS), ) c.setAsyncCommit(false) } else { c.mu.Lock() - if prewriteResp.MinCommitTs > c.minCommitTS.get() { - c.minCommitTS.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) + if prewriteResp.MinCommitTs > c.minCommitTSMgr.get() { + c.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) } c.mu.Unlock() } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 1bcb2791a3..782922e3b2 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -194,12 +194,12 @@ func (c CommitterProbe) GetCommitTS() uint64 { // GetMinCommitTS returns the minimal commit ts can be used. func (c CommitterProbe) GetMinCommitTS() uint64 { - return c.minCommitTS.get() + return c.minCommitTSMgr.get() } // SetMinCommitTS sets the minimal commit ts can be used. func (c CommitterProbe) SetMinCommitTS(ts uint64) { - c.minCommitTS.tryUpdate(ts, twoPCAccess) + c.minCommitTSMgr.tryUpdate(ts, twoPCAccess) } // SetMaxCommitTS sets the max commit ts can be used. diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index d32cc19749..ac04544869 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -805,7 +805,7 @@ func (txn *KVTxn) Rollback() error { ), &kvrpcpb.TxnStatus{ StartTs: txn.startTS, - MinCommitTs: txn.committer.minCommitTS.get(), + MinCommitTs: txn.committer.minCommitTSMgr.get(), CommitTs: 0, RolledBack: true, IsCompleted: !needCleanUpLocks, From ad0302eef1bf688a90020b52884ed64437844b95 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 19 Sep 2024 15:21:40 +0800 Subject: [PATCH 09/21] fix test Signed-off-by: ekexium --- internal/locate/region_request_test.go | 4 ++++ tikv/kv_test.go | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 8e10e30b85..c43aecb087 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -582,6 +582,10 @@ func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvr return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/tikv/kv_test.go b/tikv/kv_test.go index c2e4631e27..1a9a4618fe 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -51,7 +51,6 @@ type testKVSuite struct { func (s *testKVSuite) SetupTest() { client, cluster, pdClient, err := testutils.NewMockTiKV("", nil) s.Require().Nil(err) - testutils.BootstrapWithSingleStore(cluster) s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { return 0, nil, nil }) From a98a44307f2a443add99d4066d10dd3817046d7b Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 19 Sep 2024 20:04:14 +0800 Subject: [PATCH 10/21] Update txnkv/transaction/2pc.go Co-authored-by: you06 --- txnkv/transaction/2pc.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 90c70810f3..704859718b 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1397,9 +1397,10 @@ func broadcastToAllStores( var wg sync.WaitGroup errChan := make(chan error, len(stores)) - taskChan := make(chan *locate.Store, len(stores)) + concurrency := min(broadcastMaxConcurrency, len(stores)) + taskChan := make(chan *locate.Store, concurrency) - for i := 0; i < broadcastMaxConcurrency; i++ { + for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() From 7bad9fd8fa3405f16305f9a67936ce6abd072725 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 19 Sep 2024 20:28:59 +0800 Subject: [PATCH 11/21] refactor broadcastToAllStores to use store goroutine pool Signed-off-by: ekexium --- internal/locate/region_cache.go | 4 +- txnkv/transaction/2pc.go | 101 +++++++++++++++------------ txnkv/transaction/pipelined_flush.go | 2 + txnkv/transaction/txn.go | 1 + 4 files changed, 60 insertions(+), 48 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 36d13cbfa0..cf6bd7b107 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -743,11 +743,11 @@ func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeC return } for _, store := range storeList { - s, exist := stores.get(store.GetId()) + _, exist := stores.get(store.GetId()) if exist { continue } - s = stores.getOrInsertDefault(store.GetId()) + s := stores.getOrInsertDefault(store.GetId()) if store == nil || store.GetState() == metapb.StoreState_Tombstone { s.setResolveState(tombstone) return diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 704859718b..cc362f6ecf 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1351,6 +1351,7 @@ func keepAlive( // broadcast to all stores if isPipelinedTxn { broadcastToAllStores( + c.txn, c.store, retry.NewBackofferWithVars( context.Background(), @@ -1372,66 +1373,74 @@ func keepAlive( } } -const broadcastRpcTimeout = time.Millisecond * 500 +const broadcastRpcTimeout = time.Second * 5 const broadcastMaxConcurrency = 10 -// broadcasts to all stores to update the minCommitTSMgr. Ignore errors. +// broadcastToAllStores asynchronously broadcasts the transaction status to all stores +// errors are ignored. func broadcastToAllStores( + txn *KVTxn, store kvstore, bo *retry.Backoffer, status *kvrpcpb.TxnStatus, resourceGroupName string, resourceGroupTag []byte, ) { - stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV) - req := tikvrpc.NewRequest( - tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ - TxnStatus: []*kvrpcpb.TxnStatus{status}, - }, - ) - req.Context.ClusterId = store.GetClusterID() - req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ - ResourceGroupName: resourceGroupName, - } - req.Context.ResourceGroupTag = resourceGroupTag - - var wg sync.WaitGroup - errChan := make(chan error, len(stores)) - concurrency := min(broadcastMaxConcurrency, len(stores)) - taskChan := make(chan *locate.Store, concurrency) - - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for s := range taskChan { - _, err := store.GetTiKVClient().SendRequest( - bo.GetCtx(), - s.GetAddr(), - req, - broadcastRpcTimeout, - ) - if err != nil { - errChan <- err + broadcastFunc := func() { + stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV) + req := tikvrpc.NewRequest( + tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ + TxnStatus: []*kvrpcpb.TxnStatus{status}, + }, + ) + req.Context.ClusterId = store.GetClusterID() + req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ + ResourceGroupName: resourceGroupName, + } + req.Context.ResourceGroupTag = resourceGroupTag + + var wg sync.WaitGroup + concurrency := min(broadcastMaxConcurrency, len(stores)) + taskChan := make(chan *locate.Store, concurrency) + + for i := 0; i < concurrency; i++ { + wg.Add(1) + if err := txn.spawnWithStorePool(func() { + defer wg.Done() + for s := range taskChan { + _, err := store.GetTiKVClient().SendRequest( + bo.GetCtx(), + s.GetAddr(), + req, + broadcastRpcTimeout, + ) + if err != nil { + logutil.Logger(store.Ctx()).Info( + "broadcast txn status failed", + zap.Uint64("storeID", s.StoreID()), + zap.String("storeAddr", s.GetAddr()), + zap.Stringer("status", status), + zap.Error(err), + ) + } } + }); err != nil { + wg.Done() // Ensure wg is decremented if spawning fails + logutil.Logger(store.Ctx()).Error("failed to spawn worker goroutine", zap.Error(err)) } - }() - } + } - for _, s := range stores { - taskChan <- s - } + for _, s := range stores { + taskChan <- s + } - close(taskChan) - wg.Wait() - close(errChan) + close(taskChan) + wg.Wait() + } - for err := range errChan { - logutil.Logger(bo.GetCtx()).Info( - "broadcast txn status failed", - zap.Stringer("status", status), - zap.Error(err), - ) + if err := txn.spawnWithStorePool(broadcastFunc); err != nil { + logutil.Logger(store.Ctx()).Error("failed to spawn goroutine for broadcasting txn status", + zap.Error(err)) } } diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 945f959eed..33b0a4f59e 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -339,6 +339,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { zap.Uint64("commitTS", commitTS), ) broadcastToAllStores( + c.txn, c.store, retry.NewBackofferWithVars( bo.GetCtx(), @@ -512,6 +513,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end time.Sleep(broadcastGracePeriod) broadcastToAllStores( + c.txn, c.store, retry.NewBackofferWithVars( bo.GetCtx(), diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 9e09569c2b..7c124a6782 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -843,6 +843,7 @@ func (txn *KVTxn) Rollback() error { pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 broadcastToAllStores( + txn, txn.committer.store, retry.NewBackofferWithVars( txn.store.Ctx(), From 4f23a9e452fdcbf04e78346b46cbab1859775e95 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 19 Sep 2024 20:29:38 +0800 Subject: [PATCH 12/21] set refreshStoreListInterval to 10s Signed-off-by: ekexium --- internal/locate/region_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index cf6bd7b107..8ba5583852 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2686,7 +2686,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV const cleanCacheInterval = time.Second const cleanRegionNumPerRound = 50 -const refreshStoreListInterval = time.Second +const refreshStoreListInterval = 10 * time.Second // gcScanItemHook is only used for testing var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)]) From 1a9447b01940ff3dfae447cd3d75333ad57234ba Mon Sep 17 00:00:00 2001 From: ekexium Date: Fri, 20 Sep 2024 15:18:03 +0800 Subject: [PATCH 13/21] fix: change a read lock to write lock Signed-off-by: ekexium --- txnkv/transaction/pipelined_flush.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 33b0a4f59e..1fa81bd445 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -330,9 +330,9 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { if err = c.commitMutations(bo, &primaryMutation); err != nil { return errors.Trace(err) } - c.mu.RLock() + c.mu.Lock() c.mu.committed = true - c.mu.RUnlock() + c.mu.Unlock() logutil.Logger(bo.GetCtx()).Info( "[pipelined dml] transaction is committed", zap.Uint64("startTS", c.startTS), From e61135013d969120faf55a232fcbd66683118fe8 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 23 Sep 2024 14:56:28 +0800 Subject: [PATCH 14/21] Apply suggestions from code review Co-authored-by: crazycs --- internal/locate/region_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8ba5583852..e6fa30b28f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -750,11 +750,11 @@ func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeC s := stores.getOrInsertDefault(store.GetId()) if store == nil || store.GetState() == metapb.StoreState_Tombstone { s.setResolveState(tombstone) - return + continue } addr := store.GetAddress() if addr == "" { - return + continue } // TODO: maybe refactor this, together with other places initializing Store s.addr = addr From f165547a3425559f29f525d5f2a86ed872ddab54 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 23 Sep 2024 17:23:55 +0800 Subject: [PATCH 15/21] fix mistaken renaming Signed-off-by: ekexium --- txnkv/transaction/2pc.go | 4 ++-- txnkv/transaction/prewrite.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index cc362f6ecf..cf99f660af 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1302,7 +1302,7 @@ func keepAlive( return } - // update minCommitTSMgr, if it's a non-async-commit pipelined transaction + // update minCommitTS, if it's a non-async-commit pipelined transaction if isPipelinedTxn && !c.isOnePC() && !c.isAsyncCommit() && @@ -1847,7 +1847,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isAsyncCommit() { if c.minCommitTSMgr.get() == 0 { - return errors.Errorf("session %d invalid minCommitTSMgr for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) + return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } commitTS = c.minCommitTSMgr.get() } else { diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index f8edfb0233..26e317e106 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -413,7 +413,7 @@ func (action actionPrewrite) handleSingleBatch( return nil } logutil.Logger(bo.GetCtx()).Warn( - "async commit cannot proceed since the returned minCommitTSMgr is zero, "+ + "async commit cannot proceed since the returned minCommitTS is zero, "+ "fallback to normal path", zap.Uint64("startTS", c.startTS), ) c.setAsyncCommit(false) From ecb33d5a83b76e37497fe8f4b8ba046a13e32658 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 23 Sep 2024 17:24:07 +0800 Subject: [PATCH 16/21] Update txnkv/transaction/pipelined_flush.go Co-authored-by: crazycs --- txnkv/transaction/pipelined_flush.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 1fa81bd445..8cadf63df8 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -457,11 +457,9 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(commit, &resolved) - var commitTs uint64 + commitTs := uint64(0) if commit { commitTs = atomic.LoadUint64(&c.commitTS) - } else { - commitTs = 0 } if err != nil { logutil.Logger(bo.GetCtx()).Error( From 9de2ac9c18dde6b6105b891caab9466b4f8ffd93 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 23 Sep 2024 17:40:42 +0800 Subject: [PATCH 17/21] fix rpc code Signed-off-by: ekexium --- tikvrpc/tikvrpc.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 5d66c94e6e..96a480c91e 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -739,6 +739,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res return &Response{Resp: res.BufferBatchGet}, nil case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback: return &Response{Resp: res.GetHealthFeedback}, nil + case *tikvpb.BatchCommandsResponse_Response_BroadcastTxnStatus: + return &Response{Resp: res.BroadcastTxnStatus}, nil } panic("unreachable") } From ca93ef6c34e21ccd853d4b09b7fca2e5385fbd88 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 24 Sep 2024 14:57:09 +0800 Subject: [PATCH 18/21] comment the check of return values of GetAllStores Signed-off-by: ekexium --- internal/locate/region_cache.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index e6fa30b28f..6ada53055f 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -748,6 +748,8 @@ func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeC continue } s := stores.getOrInsertDefault(store.GetId()) + // GetAllStores is supposed to return only Up and Offline stores. + // This check is being defensive and to make it consistent with store resolve code. if store == nil || store.GetState() == metapb.StoreState_Tombstone { s.setResolveState(tombstone) continue From 4a9b7793b44991138367f309ad87cf1b9543cdfd Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 24 Sep 2024 16:09:37 +0800 Subject: [PATCH 19/21] update kvproto Signed-off-by: ekexium --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 96f3d30010..d226e9750d 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 @@ -59,5 +59,3 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/pingcap/kvproto => github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37 diff --git a/go.sum b/go.sum index 71cd91e747..029e0d04d7 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37 h1:wwoMiZm/cK9466IX3QUPgogxH+HsTuAVAouHLlBJLh0= -github.com/ekexium/kvproto v0.0.0-20240918103128-cc0810e58b37/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -76,6 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= From 4aada7ad890a91cc6685509f9aea8ee31c6f7dff Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 24 Sep 2024 16:26:57 +0800 Subject: [PATCH 20/21] apply suggestions from review Signed-off-by: ekexium --- internal/locate/region_cache.go | 9 ++-- txnkv/transaction/2pc.go | 79 +++++++++++++++++---------------- 2 files changed, 45 insertions(+), 43 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 6ada53055f..01dfa385ca 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -728,7 +728,7 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } c.bg.schedule( func(ctx context.Context, _ time.Time) bool { - refreshFullStoreList(ctx, c.PDClient(), c.stores) + refreshFullStoreList(ctx, c.stores) return false }, refreshStoreListInterval, ) @@ -736,8 +736,8 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { } // Try to refresh full store list. Errors are ignored. -func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeCache) { - storeList, err := pdClient.GetAllStores(ctx) +func refreshFullStoreList(ctx context.Context, stores storeCache) { + storeList, err := stores.fetchAllStores(ctx) if err != nil { logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err)) return @@ -747,17 +747,16 @@ func refreshFullStoreList(ctx context.Context, pdClient pd.Client, stores storeC if exist { continue } - s := stores.getOrInsertDefault(store.GetId()) // GetAllStores is supposed to return only Up and Offline stores. // This check is being defensive and to make it consistent with store resolve code. if store == nil || store.GetState() == metapb.StoreState_Tombstone { - s.setResolveState(tombstone) continue } addr := store.GetAddress() if addr == "" { continue } + s := stores.getOrInsertDefault(store.GetId()) // TODO: maybe refactor this, together with other places initializing Store s.addr = addr s.peerAddr = store.GetPeerAddress() diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index cf99f660af..2a01befe6c 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1376,8 +1376,8 @@ func keepAlive( const broadcastRpcTimeout = time.Second * 5 const broadcastMaxConcurrency = 10 -// broadcastToAllStores asynchronously broadcasts the transaction status to all stores -// errors are ignored. +// broadcastToAllStores asynchronously broadcasts the transaction status to all stores. +// Errors are ignored. func broadcastToAllStores( txn *KVTxn, store kvstore, @@ -1388,53 +1388,56 @@ func broadcastToAllStores( ) { broadcastFunc := func() { stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV) - req := tikvrpc.NewRequest( - tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ - TxnStatus: []*kvrpcpb.TxnStatus{status}, - }, - ) - req.Context.ClusterId = store.GetClusterID() - req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ - ResourceGroupName: resourceGroupName, - } - req.Context.ResourceGroupTag = resourceGroupTag + concurrency := min(broadcastMaxConcurrency, len(stores)) + rateLimit := make(chan struct{}, concurrency) var wg sync.WaitGroup - concurrency := min(broadcastMaxConcurrency, len(stores)) - taskChan := make(chan *locate.Store, concurrency) - for i := 0; i < concurrency; i++ { + for _, s := range stores { + rateLimit <- struct{}{} wg.Add(1) - if err := txn.spawnWithStorePool(func() { + target := s + + err := txn.spawnWithStorePool(func() { defer wg.Done() - for s := range taskChan { - _, err := store.GetTiKVClient().SendRequest( - bo.GetCtx(), - s.GetAddr(), - req, - broadcastRpcTimeout, + defer func() { <-rateLimit }() + + req := tikvrpc.NewRequest( + tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ + TxnStatus: []*kvrpcpb.TxnStatus{status}, + }, + ) + req.Context.ClusterId = store.GetClusterID() + req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ + ResourceGroupName: resourceGroupName, + } + req.Context.ResourceGroupTag = resourceGroupTag + + _, err := store.GetTiKVClient().SendRequest( + bo.GetCtx(), + target.GetAddr(), + req, + broadcastRpcTimeout, + ) + if err != nil { + logutil.Logger(store.Ctx()).Info( + "broadcast txn status failed", + zap.Uint64("storeID", target.StoreID()), + zap.String("storeAddr", target.GetAddr()), + zap.Stringer("status", status), + zap.Error(err), ) - if err != nil { - logutil.Logger(store.Ctx()).Info( - "broadcast txn status failed", - zap.Uint64("storeID", s.StoreID()), - zap.String("storeAddr", s.GetAddr()), - zap.Stringer("status", status), - zap.Error(err), - ) - } } - }); err != nil { - wg.Done() // Ensure wg is decremented if spawning fails + }) + + if err != nil { + // If spawning the goroutine fails, release the slot and mark done + <-rateLimit + wg.Done() logutil.Logger(store.Ctx()).Error("failed to spawn worker goroutine", zap.Error(err)) } } - for _, s := range stores { - taskChan <- s - } - - close(taskChan) wg.Wait() } From ea8180b5b91964f7c4b6e5f1df858781a34f2a49 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 24 Sep 2024 17:44:28 +0800 Subject: [PATCH 21/21] update integration_tests dependency Signed-off-by: ekexium --- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 8e2f7d9302..c5b8f4a682 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240527053858-9b3b6e34194a - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d github.com/pingcap/tidb v1.1.0-beta.0.20240703042657-230bbc2ef5ef github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.9.0 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index d50e94ff3b..77a57ebb5a 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -357,8 +357,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=