Skip to content

Commit

Permalink
external: fix decoding the last key in split keys (#59742)
Browse files Browse the repository at this point in the history
ref #59725
  • Loading branch information
CbcWestwolf authored Feb 25, 2025
1 parent b33c5bc commit 5ba8cf9
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 28 deletions.
1 change: 1 addition & 0 deletions pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/metrics",
"//pkg/util",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
69 changes: 41 additions & 28 deletions pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -98,7 +99,6 @@ type Engine struct {
endKey []byte
jobKeys [][]byte
splitKeys [][]byte
regionSplitSize int64
smallBlockBufPool *membuf.Pool
largeBlockBufPool *membuf.Pool

Expand Down Expand Up @@ -348,19 +348,11 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, jobKeys [][]byte, outC
prev = cur
}
// last range key may be a nextKey so we should try to remove the trailing 0 if decoding failed
nextKey := false
lastKey := jobKeys[len(jobKeys)-1]
cur, err4 := e.keyAdapter.Decode(nil, lastKey)
if err4 != nil && lastKey[len(lastKey)-1] == 0 {
nextKey = true
cur, err4 = e.keyAdapter.Decode(nil, lastKey[:len(lastKey)-1])
}
cur, err4 := e.tryDecodeEndKey(lastKey)
if err4 != nil {
return err4
}
if nextKey {
cur = kv.Key(cur).Next()
}
ranges = append(ranges, common.Range{
Start: prev,
End: cur,
Expand Down Expand Up @@ -438,43 +430,64 @@ func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
}

// when duplicate detection feature is enabled, the end key comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.

start, err := e.keyAdapter.Decode(nil, e.startKey)
startKey, err = e.keyAdapter.Decode(nil, e.startKey)
if err != nil {
return nil, nil, err
}
end, err := e.keyAdapter.Decode(nil, e.endKey)
if err == nil {
return start, end, nil
}
// handle the case that end key is from Key.Next()
if e.endKey[len(e.endKey)-1] != 0 {
return nil, nil, err
}
endEncoded := e.endKey[:len(e.endKey)-1]
end, err = e.keyAdapter.Decode(nil, endEncoded)
endKey, err = e.tryDecodeEndKey(e.endKey)
if err != nil {
return nil, nil, err
}
return start, kv.Key(end).Next(), nil
return startKey, endKey, nil
}

// GetRegionSplitKeys implements common.Engine.
func (e *Engine) GetRegionSplitKeys() ([][]byte, error) {
splitKeys := make([][]byte, len(e.splitKeys))
var (
err error
splitKey []byte
)
for i, k := range e.splitKeys {
var err error
splitKeys[i], err = e.keyAdapter.Decode(nil, k)
if i < len(e.splitKeys)-1 {
splitKey, err = e.keyAdapter.Decode(nil, k)
} else {
splitKey, err = e.tryDecodeEndKey(k)
}
if err != nil {
return nil, err
}
splitKeys[i] = splitKey
}
return splitKeys, nil
}

// tryDecodeEndKey tries to decode the key from two sources.
// When duplicate detection feature is enabled, the **end key** comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.
func (e Engine) tryDecodeEndKey(key []byte) (decoded []byte, err error) {
decoded, err = e.keyAdapter.Decode(nil, key)
if err == nil {
return
}
if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok {
// NoopKeyAdapter.Decode always return nil error
intest.Assert(false, "Unreachable code path")
return nil, err
}
// handle the case that end key is from Key.Next()
if key[len(key)-1] != 0 {
return nil, err
}
key = key[:len(key)-1]
decoded, err = e.keyAdapter.Decode(nil, key)
if err != nil {
return nil, err
}
return kv.Key(decoded).Next(), nil
}

// Close implements common.Engine.
func (e *Engine) Close() error {
if e.smallBlockBufPool != nil {
Expand Down
113 changes: 113 additions & 0 deletions pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -261,3 +262,115 @@ func TestGetAdjustedConcurrency(t *testing.T) {
e.dataFiles = genFiles(10000)
require.Equal(t, 1, e.getAdjustedConcurrency())
}

func TestTryDecodeEndKey(t *testing.T) {
encodedRowID := common.EncodeIntRowID(1)
e := &Engine{}

e.keyAdapter = common.DupDetectKeyAdapter{}
key := []byte("1234")
encodedKey0 := e.keyAdapter.Encode(nil, key, encodedRowID)
encodedKey1 := kv.Key(encodedKey0).Next()
encodedKey2 := make([]byte, len(encodedKey1))
copy(encodedKey2, encodedKey1)
encodedKey2[len(encodedKey2)-1] = 1
testcases := []struct {
encodedKey []byte
succeed bool
result []byte
}{
{encodedKey0, true, key},
{encodedKey1, true, kv.Key(key).Next()},
{encodedKey2, false, nil},
}
for _, tc := range testcases {
decoded, err := e.tryDecodeEndKey(tc.encodedKey)
if tc.succeed {
require.NoError(t, err)
require.Equal(t, tc.result, decoded)
} else {
require.Error(t, err)
}
}

e.keyAdapter = common.NoopKeyAdapter{}
encodedKey0 = e.keyAdapter.Encode(nil, key, encodedRowID)
encodedKey1 = kv.Key(encodedKey0).Next()
encodedKey2 = make([]byte, len(encodedKey1))
copy(encodedKey2, encodedKey1)
encodedKey2[len(encodedKey2)-1] = 1
testcases = []struct {
encodedKey []byte
succeed bool
result []byte
}{
{encodedKey0, true, encodedKey0},
{encodedKey1, true, encodedKey1},
{encodedKey2, true, encodedKey2},
}
for _, tc := range testcases {
decoded, err := e.tryDecodeEndKey(tc.encodedKey)
if tc.succeed {
require.NoError(t, err)
require.Equal(t, tc.result, decoded)
} else {
require.Error(t, err)
}
}
}

func TestGetRegionSplitKeys(t *testing.T) {
key1 := []byte("1234")
key2 := []byte("1235")
key3 := []byte("1236")
e := &Engine{}

e.keyAdapter = common.DupDetectKeyAdapter{}
encodedKey1 := e.keyAdapter.Encode(nil, key1, common.EncodeIntRowID(1))
encodedKey2 := e.keyAdapter.Encode(nil, key2, common.EncodeIntRowID(2))
encodedKey3 := e.keyAdapter.Encode(nil, key3, common.EncodeIntRowID(3))
encodedKey2Next := kv.Key(encodedKey2).Next()
encodedKey3Next := kv.Key(encodedKey3).Next()
testcases := []struct {
splitKeys [][]byte
succeed bool
expectedKeys [][]byte
}{
{
[][]byte{encodedKey1, encodedKey2, encodedKey3},
true,
[][]byte{key1, key2, key3},
},
{
[][]byte{encodedKey1, encodedKey2, encodedKey3Next},
true,
[][]byte{key1, key2, kv.Key(key3).Next()},
},
{
[][]byte{encodedKey1, encodedKey2Next, encodedKey3Next},
false,
nil,
},
}
for _, tc := range testcases {
e.splitKeys = tc.splitKeys
res, err := e.GetRegionSplitKeys()
if tc.succeed {
require.NoError(t, err)
require.Equal(t, len(tc.expectedKeys), len(res))
for i := range tc.expectedKeys {
require.Equal(t, res[i], tc.expectedKeys[i])
}
} else {
require.Error(t, err)
}
}

e.keyAdapter = common.NoopKeyAdapter{}
for _, tc := range testcases {
e.splitKeys = tc.splitKeys
res, err := e.GetRegionSplitKeys()
require.NoError(t, err)
require.Equal(t, len(tc.splitKeys), len(res))
}
}

0 comments on commit 5ba8cf9

Please sign in to comment.