Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn-file: Fix missing txn chunks in prewrite #1350

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions txnkv/transaction/txn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -142,20 +141,26 @@ func (cs *txnChunkSlice) len() int {
// []chunkBatch is sorted by region.StartKey.
// Note: regions may be overlapping.
func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer) ([]chunkBatch, error) {
batchMap := make(map[locate.RegionVerID]*chunkBatch)
// Do not use `locate.RegionVerID` as map key to avoid grouping chunks to different batches when `confVer` changes.
type batchMapKey struct {
regionID uint64
regionVer uint64
}
batchMap := make(map[batchMapKey]*chunkBatch)
for i, chunkRange := range cs.chunkRanges {
regions, err := chunkRange.getOverlapRegions(c, bo)
if err != nil {
return nil, errors.WithStack(err)
}

for _, r := range regions {
if batchMap[r.Region] == nil {
batchMap[r.Region] = &chunkBatch{
key := batchMapKey{regionID: r.Region.GetID(), regionVer: r.Region.GetVer()}
if batchMap[key] == nil {
batchMap[key] = &chunkBatch{
region: r,
}
}
batchMap[r.Region].append(cs.chunkIDs[i], chunkRange)
batchMap[key].append(cs.chunkIDs[i], chunkRange)
}
}

Expand All @@ -164,10 +169,17 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff
batches = append(batches, *batch)
}
sort.Slice(batches, func(i, j int) bool {
return bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey) < 0
// Sort by both chunks and region, to make sure that primary key is in the first batch:
// 1. Different batches may contain the same chunks.
// 2. Different batches may have regions with same start key (if region merge happens during grouping).
cmp := bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey)
if cmp == 0 {
return bytes.Compare(batches[i].Smallest(), batches[j].Smallest()) < 0
}
return cmp < 0
})

logutil.Logger(bo.GetCtx()).Debug("txn file group to batches", zap.Stringers("batches", batches))
logutil.Logger(bo.GetCtx()).Info("txn file group to batches", zap.Stringers("batches", batches))
return batches, nil
}

Expand Down Expand Up @@ -553,7 +565,7 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) {
return
}

func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) (txnChunkSlice, error) {
func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) (txnChunkSlice, error) {
var err error
var regionErrChunks txnChunkSlice

Expand All @@ -565,10 +577,6 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice
}

for _, batch := range batches {
if successRanges.Covered(batch.region.StartKey, batch.region.EndKey) {
continue
}

resp, err1 := action.executeBatch(c, bo, batch)
logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch finished",
zap.Uint64("startTS", c.startTS),
Expand Down Expand Up @@ -605,18 +613,16 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice
regionErrChunks.appendSlice(&batch.txnChunkSlice)
continue
}

successRanges.Insert(batch.region.StartKey, batch.region.EndKey)
}
return regionErrChunks, nil
}

func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) error {
func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) error {
currentChunks := chunkSlice
currentBatches := batches
for {
var regionErrChunks txnChunkSlice
regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action, successRanges)
regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -687,18 +693,15 @@ func (c *twoPhaseCommitter) executeTxnFileAction(bo *retry.Backoffer, chunkSlice
if len(secondaries) == 0 {
return nil
}
primaryRegion := batches[0].region
successRanges := util.NewMergeRanges()
successRanges.Insert(primaryRegion.StartKey, primaryRegion.EndKey)
var emptySlice txnChunkSlice
if !action.asyncExecuteSecondaries() {
return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges)
return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action)
}

c.store.WaitGroup().Add(1)
errGo := c.store.Go(func() {
defer c.store.WaitGroup().Done()
err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges)
err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action)
logutil.Logger(bo.GetCtx()).Debug("txn file: async execute secondaries finished",
zap.Uint64("startTS", c.startTS),
zap.Stringer("action", action),
Expand Down
134 changes: 0 additions & 134 deletions util/merge_ranges.go

This file was deleted.

Loading
Loading