From 661023129ecf0d3c70bb9958a598aa67ec6149dc Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 May 2024 11:46:15 +0800 Subject: [PATCH 1/2] sort batches by both chunk & region; remove success ranges Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 45 +++---- util/merge_ranges.go | 134 -------------------- util/merge_ranges_test.go | 225 ---------------------------------- 3 files changed, 24 insertions(+), 380 deletions(-) delete mode 100644 util/merge_ranges.go delete mode 100644 util/merge_ranges_test.go diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index a44e0255bf..a444d78124 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -40,7 +40,6 @@ import ( "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" - "github.com/tikv/client-go/v2/util" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -142,7 +141,12 @@ func (cs *txnChunkSlice) len() int { // []chunkBatch is sorted by region.StartKey. // Note: regions may be overlapping. func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer) ([]chunkBatch, error) { - batchMap := make(map[locate.RegionVerID]*chunkBatch) + // Do not use `locate.RegionVerID` as map key to avoid grouping chunks to different batches when `confVer` changes. + type batchMapKey struct { + regionID uint64 + regionVer uint64 + } + batchMap := make(map[batchMapKey]*chunkBatch) for i, chunkRange := range cs.chunkRanges { regions, err := chunkRange.getOverlapRegions(c, bo) if err != nil { @@ -150,12 +154,13 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff } for _, r := range regions { - if batchMap[r.Region] == nil { - batchMap[r.Region] = &chunkBatch{ + key := batchMapKey{regionID: r.Region.GetID(), regionVer: r.Region.GetVer()} + if batchMap[key] == nil { + batchMap[key] = &chunkBatch{ region: r, } } - batchMap[r.Region].append(cs.chunkIDs[i], chunkRange) + batchMap[key].append(cs.chunkIDs[i], chunkRange) } } @@ -164,10 +169,17 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff batches = append(batches, *batch) } sort.Slice(batches, func(i, j int) bool { - return bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey) < 0 + // Sort by both chunks and region, to make sure that primary key is in the first batch: + // 1. Different batches may contain the same chunks. + // 2. Different batches may have regions with same start key (if region merge happens during grouping) + cmp := bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey) + if cmp == 0 { + return bytes.Compare(batches[i].Smallest(), batches[j].Smallest()) < 0 + } + return cmp < 0 }) - logutil.Logger(bo.GetCtx()).Debug("txn file group to batches", zap.Stringers("batches", batches)) + logutil.Logger(bo.GetCtx()).Info("txn file group to batches", zap.Stringers("batches", batches)) return batches, nil } @@ -553,7 +565,7 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) { return } -func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) (txnChunkSlice, error) { +func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) (txnChunkSlice, error) { var err error var regionErrChunks txnChunkSlice @@ -565,10 +577,6 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice } for _, batch := range batches { - if successRanges.Covered(batch.region.StartKey, batch.region.EndKey) { - continue - } - resp, err1 := action.executeBatch(c, bo, batch) logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch finished", zap.Uint64("startTS", c.startTS), @@ -605,18 +613,16 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice regionErrChunks.appendSlice(&batch.txnChunkSlice) continue } - - successRanges.Insert(batch.region.StartKey, batch.region.EndKey) } return regionErrChunks, nil } -func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) error { +func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) error { currentChunks := chunkSlice currentBatches := batches for { var regionErrChunks txnChunkSlice - regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action, successRanges) + regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action) if err != nil { return errors.WithStack(err) } @@ -687,18 +693,15 @@ func (c *twoPhaseCommitter) executeTxnFileAction(bo *retry.Backoffer, chunkSlice if len(secondaries) == 0 { return nil } - primaryRegion := batches[0].region - successRanges := util.NewMergeRanges() - successRanges.Insert(primaryRegion.StartKey, primaryRegion.EndKey) var emptySlice txnChunkSlice if !action.asyncExecuteSecondaries() { - return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges) + return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action) } c.store.WaitGroup().Add(1) errGo := c.store.Go(func() { defer c.store.WaitGroup().Done() - err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges) + err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action) logutil.Logger(bo.GetCtx()).Debug("txn file: async execute secondaries finished", zap.Uint64("startTS", c.startTS), zap.Stringer("action", action), diff --git a/util/merge_ranges.go b/util/merge_ranges.go deleted file mode 100644 index 1d7e2a5ca0..0000000000 --- a/util/merge_ranges.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2024 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "bytes" - "container/list" -) - -// To make life easier. -// MergeRanges is used for txn file only, all keys are TiDB keys, and must not be bigger than following `maxEndKey`. -var maxEndKey = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} - -type keyRange struct { - start []byte - end []byte -} - -type MergeRanges struct { - ranges *list.List -} - -func NewMergeRanges() *MergeRanges { - r := &MergeRanges{ - ranges: list.New(), - } - return r -} - -func (m *MergeRanges) IsEmpty() bool { - return m.ranges.Len() == 0 -} - -// searchByStart search range by start key. -// If the key is found, return the element and true. -// If the key is not found, return the element to insert BEFORE and false. When the mark is nil, push to the back. -func (m *MergeRanges) searchByStart(key []byte) (mark *list.Element, ok bool) { - for e := m.ranges.Front(); e != nil; e = e.Next() { - cmp := bytes.Compare(e.Value.(*keyRange).start, key) - if cmp == 0 { - return e, true - } else if cmp > 0 { - return e, false - } - } - return nil, false -} - -// searchOverlapped search the overlapped range for key. -// range.start <= key <= range.end -// If the key is found, return the element and true. -// If the key is not found, return the element to insert BEFORE and false. When the mark is nil, push to the back. -func (m *MergeRanges) searchOverlapped(key []byte) (mark *list.Element, overlapped bool) { - mark, overlapped = m.searchByStart(key) - if overlapped { - return mark, true - } - - var prev *list.Element - if mark == nil { - prev = m.ranges.Back() - } else { - prev = mark.Prev() - } - - if prev == nil { - return m.ranges.Front(), false - } - if bytes.Compare(key, prev.Value.(*keyRange).end) <= 0 { - return prev, true - } - return mark, false -} - -// Covered returns whether the range [start,end) is fully covered. -func (m *MergeRanges) Covered(start []byte, end []byte) bool { - if len(end) == 0 { - end = maxEndKey - } - - mark, ok := m.searchOverlapped(start) - if ok { - return bytes.Compare(end, mark.Value.(*keyRange).end) <= 0 - } - return false -} - -// Insert inserts the range [start,end) into MergeRanges. -func (m *MergeRanges) Insert(start []byte, end []byte) { - if len(end) == 0 { - end = maxEndKey - } - - left, ok := m.searchOverlapped(start) - if ok { - if bytes.Compare(end, left.Value.(*keyRange).end) <= 0 { - return - } - start = left.Value.(*keyRange).start - } - - if left == nil { - m.ranges.PushBack(&keyRange{start, end}) - return - } - - right, ok := m.searchOverlapped(end) - rightBound := right - if ok { - end = right.Value.(*keyRange).end - rightBound = right.Next() - } - - m.ranges.InsertBefore(&keyRange{start, end}, left) - - // Remove overlapped. - var next *list.Element - for e := left; e != rightBound; e = next { - next = e.Next() - m.ranges.Remove(e) - } -} diff --git a/util/merge_ranges_test.go b/util/merge_ranges_test.go deleted file mode 100644 index d7c558cc72..0000000000 --- a/util/merge_ranges_test.go +++ /dev/null @@ -1,225 +0,0 @@ -// Copyright 2024 TiKV Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "container/list" - "fmt" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestMergeRangesSearchOverlapped(t *testing.T) { - assert := require.New(t) - - mr := NewMergeRanges() - type Case struct { - key int - overlapped bool - rng *keyRange // the range overlap with or after the key - } - checkOverlapped := func(c Case) { - mark, overlapped := mr.searchOverlapped(iToKey(c.key)) - assert.Equal(c.overlapped, overlapped) - if c.rng == nil { - assert.Nil(mark) - } else { - assert.Equal(c.rng, mark.Value.(*keyRange)) - } - } - - // mr: empty - checkOverlapped(Case{0, false, nil}) - - insertRange(mr, 0, 1) - // mr: [0,1) - cases := []Case{ - {0, true, makeRange(0, 1)}, - {1, true, makeRange(0, 1)}, - {2, false, nil}, - } - for _, c := range cases { - checkOverlapped(c) - } - - insertRange(mr, 3, 5) - // mr: [0,1), [3,5) - cases = []Case{ - {0, true, makeRange(0, 1)}, - {1, true, makeRange(0, 1)}, - {2, false, makeRange(3, 5)}, - {3, true, makeRange(3, 5)}, - {4, true, makeRange(3, 5)}, - {5, true, makeRange(3, 5)}, - {6, false, nil}, - } - for _, c := range cases { - checkOverlapped(c) - } -} - -func TestMergeRangesCovered(t *testing.T) { - assert := require.New(t) - - mr := NewMergeRanges() - type Case struct { - start int - end int - covered bool - } - checkCovered := func(c Case) { - assert.Equal(c.covered, mr.Covered(iToKey(c.start), iToKey(c.end))) - } - - // mr: empty - checkCovered(Case{0, 1, false}) - - insertRange(mr, 0, 1) - // mr: [0,1) - cases := []Case{ - {0, 1, true}, - {1, 2, false}, - } - for _, c := range cases { - checkCovered(c) - } - - insertRange(mr, 2, 4) - // mr: [0,1), [2,4) - cases = []Case{ - {0, 1, true}, - {1, 2, false}, - {2, 3, true}, - {2, 4, true}, - {1, 2, false}, - {1, 3, false}, - {1, 4, false}, - {1, 5, false}, - {2, 5, false}, - {4, 5, false}, - {0, 4, false}, - } - for _, c := range cases { - checkCovered(c) - } -} - -func TestMergeRangesInsert(t *testing.T) { - assert := require.New(t) - - mr := NewMergeRanges() - assertEqual := func(keys [][2]int) { - expected := makeMergeRanges(keys) - assert.Equal(expected.ranges, mr.ranges) - } - - insertRange(mr, 2, 3) - assertEqual([][2]int{{2, 3}}) - insertRange(mr, 3, 4) - assertEqual([][2]int{{2, 4}}) - - insertRange(mr, 0, 1) - assertEqual([][2]int{{0, 1}, {2, 4}}) - insertRange(mr, 1, 2) - assertEqual([][2]int{{0, 4}}) - - insertRange(mr, 5, 6) - assertEqual([][2]int{{0, 4}, {5, 6}}) - insertRange(mr, 4, 5) - assertEqual([][2]int{{0, 6}}) - - insertRange(mr, 10, 11) - assertEqual([][2]int{{0, 6}, {10, 11}}) - insertRange(mr, 3, 12) - assertEqual([][2]int{{0, 12}}) - - assert.True(mr.Covered(iToKey(0), iToKey(12))) -} - -func TestMergeRangesOverlappingRanges(t *testing.T) { - assert := require.New(t) - - // mr: [2,12) - mr := makeMergeRanges([][2]int{{2, 12}}) - assertEqual := func(keys [][2]int) { - expected := makeMergeRanges(keys) - assert.Equal(expected.ranges, mr.ranges) - } - - insertRange(mr, 2, 6) - assertEqual([][2]int{{2, 12}}) - - insertRange(mr, 3, 8) - assertEqual([][2]int{{2, 12}}) - - insertRange(mr, 6, 12) - assertEqual([][2]int{{2, 12}}) - - insertRange(mr, 1, 4) - assertEqual([][2]int{{1, 12}}) - - insertRange(mr, 4, 15) - assertEqual([][2]int{{1, 15}}) - - insertRange(mr, 20, 21) - insertRange(mr, 22, 23) - insertRange(mr, 24, 25) - insertRange(mr, 14, 30) - assertEqual([][2]int{{1, 30}}) - - insertRange(mr, 40, 41) - insertRange(mr, 42, 43) - insertRange(mr, 50, 51) - insertRange(mr, 53, 54) - insertRange(mr, 0, 42) - assertEqual([][2]int{{0, 43}, {50, 51}, {53, 54}}) -} - -func TestMergeRangesEmptyKey(t *testing.T) { - assert := require.New(t) - - mr := NewMergeRanges() - - assert.False(mr.Covered(nil, nil)) - - mr.Insert(nil, iToKey(10)) // (infinite, 10) - mr.Insert(iToKey(10), nil) // [10, infinite) - assert.True(mr.Covered(iToKey(0), iToKey(100))) - assert.True(mr.Covered(nil, nil)) -} - -func iToKey(i int) []byte { - return []byte(fmt.Sprintf("%04d", i)) -} - -func makeRange(start, end int) *keyRange { - return &keyRange{ - start: iToKey(start), - end: iToKey(end), - } -} - -func insertRange(mr *MergeRanges, start, end int) { - mr.Insert(iToKey(start), iToKey(end)) -} - -func makeMergeRanges(keys [][2]int) *MergeRanges { - l := list.New() - for _, key := range keys { - l.PushBack(&keyRange{iToKey(key[0]), iToKey(key[1])}) - } - return &MergeRanges{ranges: l} -} From 8ab3da907b48fbad95d5d1e04c13af33025fa4d4 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 May 2024 15:09:40 +0800 Subject: [PATCH 2/2] polish Signed-off-by: Ping Yu --- txnkv/transaction/txn_file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txnkv/transaction/txn_file.go b/txnkv/transaction/txn_file.go index a444d78124..b615375f03 100644 --- a/txnkv/transaction/txn_file.go +++ b/txnkv/transaction/txn_file.go @@ -171,7 +171,7 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff sort.Slice(batches, func(i, j int) bool { // Sort by both chunks and region, to make sure that primary key is in the first batch: // 1. Different batches may contain the same chunks. - // 2. Different batches may have regions with same start key (if region merge happens during grouping) + // 2. Different batches may have regions with same start key (if region merge happens during grouping). cmp := bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey) if cmp == 0 { return bytes.Compare(batches[i].Smallest(), batches[j].Smallest()) < 0