Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv (ticdc): fix kvClient reconnection downhill loop (#10559) #10572

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@
package entry

import (
<<<<<<< HEAD

Check failure on line 17 in cdc/entry/schema_test_helper.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

missing import path

Check failure on line 17 in cdc/entry/schema_test_helper.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

missing import path
=======
"context"
>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559))
"encoding/json"
"strings"
"testing"

<<<<<<< HEAD
ticonfig "github.com/pingcap/tidb/config"
tiddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
Expand All @@ -27,6 +32,23 @@
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/testkit"
=======
ticonfig "github.com/pingcap/tidb/pkg/config"
tiddl "github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
timeta "github.com/pingcap/tidb/pkg/meta"
timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/util"
>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559))
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
Expand Down Expand Up @@ -127,6 +149,112 @@
return jobs
}

<<<<<<< HEAD
=======
// DML2Event execute the dml and return the corresponding row changed event.
// caution: it does not support `delete` since the key value cannot be found
// after the query executed.
func (s *SchemaTestHelper) DML2Event(dml string, schema, table string) *model.RowChangedEvent {
s.tk.MustExec(dml)

tableInfo, ok := s.schemaStorage.GetLastSnapshot().TableByName(schema, table)
require.True(s.t, ok)

key, value := s.getLastKeyValue(tableInfo.ID)
ts := s.schemaStorage.GetLastSnapshot().CurrentTs()
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
OldValue: nil,
StartTs: ts - 1,
CRTs: ts + 1,
}
polymorphicEvent := model.NewPolymorphicEvent(rawKV)
err := s.mounter.DecodeEvent(context.Background(), polymorphicEvent)
require.NoError(s.t, err)
return polymorphicEvent.Row
}

func (s *SchemaTestHelper) getLastKeyValue(tableID int64) (key, value []byte) {
txn, err := s.storage.Begin()
require.NoError(s.t, err)
defer txn.Rollback() //nolint:errcheck

start, end := spanz.GetTableRange(tableID)
iter, err := txn.Iter(start, end)
require.NoError(s.t, err)
defer iter.Close()
for iter.Valid() {
key = iter.Key()
value = iter.Value()
err = iter.Next()
require.NoError(s.t, err)
}
return key, value
}

// DDL2Event executes the DDL and return the corresponding event.
func (s *SchemaTestHelper) DDL2Event(ddl string) *model.DDLEvent {
s.tk.MustExec(ddl)
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.NoError(s.t, err)
require.Len(s.t, jobs, 1)
// Set State from Synced to Done.
// Because jobs are put to history queue after TiDB alter its state from
// Done to Synced.
jobs[0].State = timodel.JobStateDone
res := jobs[0]
if res.Type == timodel.ActionRenameTables {
// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
}
newTableNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = timodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
}
rawArgs, err := json.Marshal(args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
}

err = s.schemaStorage.HandleDDLJob(res)
require.NoError(s.t, err)

ver, err := s.storage.CurrentVersion(oracle.GlobalTxnScope)
require.NoError(s.t, err)
s.schemaStorage.AdvanceResolvedTs(ver.Ver)

ctx := context.Background()

events, err := s.schemaStorage.BuildDDLEvents(ctx, res)
require.NoError(s.t, err)

return events[0]
}

>>>>>>> 98adc64c8d (kv (ticdc): fix kvClient reconnection downhill loop (#10559))
// Storage returns the tikv storage
func (s *SchemaTestHelper) Storage() kv.Storage {
return s.storage
Expand Down
Loading
Loading