Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external: fix decoding the last key in split keys #59613

Merged
merged 8 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/metrics",
"//pkg/util",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/size",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
72 changes: 42 additions & 30 deletions pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -98,7 +99,6 @@ type Engine struct {
endKey []byte
jobKeys [][]byte
splitKeys [][]byte
regionSplitSize int64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remove useless field

smallBlockBufPool *membuf.Pool
largeBlockBufPool *membuf.Pool

Expand All @@ -110,8 +110,7 @@ type Engine struct {
// this flag also affects the strategy of loading data, either:
// less load routine + check and read hotspot file concurrently (add-index uses this one)
// more load routine + read each file using 1 reader (import-into uses this one)
checkHotspot bool
mergerIterConcurrency int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

checkHotspot bool

keyAdapter common.KeyAdapter
duplicateDetection bool
Expand Down Expand Up @@ -348,19 +347,11 @@ func (e *Engine) loadBatchRegionData(ctx context.Context, jobKeys [][]byte, outC
prev = cur
}
// last range key may be a nextKey so we should try to remove the trailing 0 if decoding failed
nextKey := false
lastKey := jobKeys[len(jobKeys)-1]
cur, err4 := e.keyAdapter.Decode(nil, lastKey)
if err4 != nil && lastKey[len(lastKey)-1] == 0 {
nextKey = true
cur, err4 = e.keyAdapter.Decode(nil, lastKey[:len(lastKey)-1])
}
cur, err4 := e.tryDecodeEndKey(lastKey)
if err4 != nil {
return err4
}
if nextKey {
cur = kv.Key(cur).Next()
}
ranges = append(ranges, common.Range{
Start: prev,
End: cur,
Expand Down Expand Up @@ -438,43 +429,64 @@ func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error) {
return e.startKey, e.endKey, nil
}

// when duplicate detection feature is enabled, the end key comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.

start, err := e.keyAdapter.Decode(nil, e.startKey)
startKey, err = e.keyAdapter.Decode(nil, e.startKey)
if err != nil {
return nil, nil, err
}
end, err := e.keyAdapter.Decode(nil, e.endKey)
if err == nil {
return start, end, nil
}
// handle the case that end key is from Key.Next()
if e.endKey[len(e.endKey)-1] != 0 {
return nil, nil, err
}
endEncoded := e.endKey[:len(e.endKey)-1]
end, err = e.keyAdapter.Decode(nil, endEncoded)
endKey, err = e.tryDecodeEndKey(e.endKey)
if err != nil {
return nil, nil, err
}
return start, kv.Key(end).Next(), nil
return startKey, endKey, nil
}

// GetRegionSplitKeys implements common.Engine.
func (e *Engine) GetRegionSplitKeys() ([][]byte, error) {
splitKeys := make([][]byte, len(e.splitKeys))
var (
err error
splitKey []byte
)
for i, k := range e.splitKeys {
var err error
splitKeys[i], err = e.keyAdapter.Decode(nil, k)
if i < len(e.splitKeys)-1 {
splitKey, err = e.keyAdapter.Decode(nil, k)
} else {
splitKey, err = e.tryDecodeEndKey(k)
}
if err != nil {
return nil, err
}
splitKeys[i] = splitKey
}
return splitKeys, nil
}

// tryDecodeEndKey tries to decode the key from two sources.
// When duplicate detection feature is enabled, the **end key** comes from
// DupDetectKeyAdapter.Encode or Key.Next(). We try to decode it and check the
// error.
func (e Engine) tryDecodeEndKey(key []byte) (decoded []byte, err error) {
decoded, err = e.keyAdapter.Decode(nil, key)
if err == nil {
return
}
if _, ok := e.keyAdapter.(common.NoopKeyAdapter); ok {
// NoopKeyAdapter.Decode always return nil error
intest.Assert(false, "Unreachable code path")
return nil, err
}
// handle the case that end key is from Key.Next()
if key[len(key)-1] != 0 {
return nil, err
}
key = key[:len(key)-1]
decoded, err = e.keyAdapter.Decode(nil, key)
if err != nil {
return nil, err
}
return kv.Key(decoded).Next(), nil
}

// Close implements common.Engine.
func (e *Engine) Close() error {
if e.smallBlockBufPool != nil {
Expand Down
113 changes: 113 additions & 0 deletions pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/membuf"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -261,3 +262,115 @@ func TestGetAdjustedConcurrency(t *testing.T) {
e.dataFiles = genFiles(10000)
require.Equal(t, 1, e.getAdjustedConcurrency())
}

func TestTryDecodeEndKey(t *testing.T) {
encodedRowID := common.EncodeIntRowID(1)
e := &Engine{}

e.keyAdapter = common.DupDetectKeyAdapter{}
key := []byte("1234")
encodedKey0 := e.keyAdapter.Encode(nil, key, encodedRowID)
encodedKey1 := kv.Key(encodedKey0).Next()
encodedKey2 := make([]byte, len(encodedKey1))
copy(encodedKey2, encodedKey1)
encodedKey2[len(encodedKey2)-1] = 1
testcases := []struct {
encodedKey []byte
succeed bool
result []byte
}{
{encodedKey0, true, key},
{encodedKey1, true, kv.Key(key).Next()},
{encodedKey2, false, nil},
}
for _, tc := range testcases {
decoded, err := e.tryDecodeEndKey(tc.encodedKey)
if tc.succeed {
require.NoError(t, err)
require.Equal(t, tc.result, decoded)
} else {
require.Error(t, err)
}
}

e.keyAdapter = common.NoopKeyAdapter{}
encodedKey0 = e.keyAdapter.Encode(nil, key, encodedRowID)
encodedKey1 = kv.Key(encodedKey0).Next()
encodedKey2 = make([]byte, len(encodedKey1))
copy(encodedKey2, encodedKey1)
encodedKey2[len(encodedKey2)-1] = 1
testcases = []struct {
encodedKey []byte
succeed bool
result []byte
}{
{encodedKey0, true, encodedKey0},
{encodedKey1, true, encodedKey1},
{encodedKey2, true, encodedKey2},
}
for _, tc := range testcases {
decoded, err := e.tryDecodeEndKey(tc.encodedKey)
if tc.succeed {
require.NoError(t, err)
require.Equal(t, tc.result, decoded)
} else {
require.Error(t, err)
}
}
}

func TestGetRegionSplitKeys(t *testing.T) {
key1 := []byte("1234")
key2 := []byte("1235")
key3 := []byte("1236")
e := &Engine{}

e.keyAdapter = common.DupDetectKeyAdapter{}
encodedKey1 := e.keyAdapter.Encode(nil, key1, common.EncodeIntRowID(1))
encodedKey2 := e.keyAdapter.Encode(nil, key2, common.EncodeIntRowID(2))
encodedKey3 := e.keyAdapter.Encode(nil, key3, common.EncodeIntRowID(3))
encodedKey2Next := kv.Key(encodedKey2).Next()
encodedKey3Next := kv.Key(encodedKey3).Next()
testcases := []struct {
splitKeys [][]byte
succeed bool
expectedKeys [][]byte
}{
{
[][]byte{encodedKey1, encodedKey2, encodedKey3},
true,
[][]byte{key1, key2, key3},
},
{
[][]byte{encodedKey1, encodedKey2, encodedKey3Next},
true,
[][]byte{key1, key2, kv.Key(key3).Next()},
},
{
[][]byte{encodedKey1, encodedKey2Next, encodedKey3Next},
false,
nil,
},
}
for _, tc := range testcases {
e.splitKeys = tc.splitKeys
res, err := e.GetRegionSplitKeys()
if tc.succeed {
require.NoError(t, err)
require.Equal(t, len(tc.expectedKeys), len(res))
for i := range tc.expectedKeys {
require.Equal(t, res[i], tc.expectedKeys[i])
}
} else {
require.Error(t, err)
}
}

e.keyAdapter = common.NoopKeyAdapter{}
for _, tc := range testcases {
e.splitKeys = tc.splitKeys
res, err := e.GetRegionSplitKeys()
require.NoError(t, err)
require.Equal(t, len(tc.splitKeys), len(res))
}
}