Skip to content

Commit

Permalink
codec(ticdc): Revert "codec(ticdc): canal-json encode table id (#11875)…
Browse files Browse the repository at this point in the history
…" (#11992)

close #11993
  • Loading branch information
3AceShowHand authored Jan 14, 2025
1 parent 5b4b44e commit fa5baad
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 99 deletions.
51 changes: 24 additions & 27 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ type canalJSONMessageInterface interface {
getCommitTs() uint64
getPhysicalTableID() int64
getTableID() int64
isPartition() bool
getQuery() string
getOld() map[string]interface{}
getData() map[string]interface{}
Expand Down Expand Up @@ -96,10 +95,6 @@ func (c *JSONMessage) getPhysicalTableID() int64 {
return 0
}

func (c *JSONMessage) isPartition() bool {
return false
}

func (c *JSONMessage) getQuery() string {
return c.Query
}
Expand Down Expand Up @@ -152,8 +147,6 @@ func (c *JSONMessage) pkNameSet() map[string]struct{} {

type tidbExtension struct {
CommitTs uint64 `json:"commitTs,omitempty"`
TableID int64 `json:"tableId,omitempty"`
PhysicalTableID int64 `json:"partitionId,omitempty"`
WatermarkTs uint64 `json:"watermarkTs,omitempty"`
OnlyHandleKey bool `json:"onlyHandleKey,omitempty"`
ClaimCheckLocation string `json:"claimCheckLocation,omitempty"`
Expand All @@ -172,21 +165,6 @@ func (c *canalJSONMessageWithTiDBExtension) getCommitTs() uint64 {
return c.Extensions.CommitTs
}

func (c *canalJSONMessageWithTiDBExtension) getTableID() int64 {
return c.Extensions.TableID
}

func (c *canalJSONMessageWithTiDBExtension) getPhysicalTableID() int64 {
if c.Extensions.PhysicalTableID != 0 {
return c.Extensions.PhysicalTableID
}
return c.Extensions.TableID
}

func (c *canalJSONMessageWithTiDBExtension) isPartition() bool {
return c.Extensions.PhysicalTableID != 0
}

func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo {
schema := *msg.getSchema()
table := *msg.getTable()
Expand Down Expand Up @@ -249,18 +227,39 @@ func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physical
}
}
for _, partition := range event.TableInfo.Partition.Definitions {
if partition.LessThan[0] == "MAXVALUE" {
lessThan := partition.LessThan[0]
if lessThan == "MAXVALUE" {
event.PhysicalTableID = partition.ID
return nil
}
if len(columnValue) < len(lessThan) {
event.PhysicalTableID = partition.ID
return nil
}
if strings.Compare(columnValue, partition.LessThan[0]) == -1 {
if strings.Compare(columnValue, lessThan) == -1 {
event.PhysicalTableID = partition.ID
return nil
}
}
return fmt.Errorf("cannot found partition for column value %s", columnValue)
// todo: support following rule if meet the corresponding workload
case pmodel.PartitionTypeHash:
targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
columns := event.Columns
if columns == nil {
columns = event.PreColumns
}
var columnValue int64
for _, col := range columns {
if col.ColumnID == targetColumnID {
columnValue = col.Value.(int64)
break
}
}
result := columnValue % int64(len(event.TableInfo.Partition.Definitions))
partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID
event.PhysicalTableID = partitionID
return nil
case pmodel.PartitionTypeKey:
case pmodel.PartitionTypeList:
case pmodel.PartitionTypeNone:
Expand All @@ -274,10 +273,8 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err
result := new(model.RowChangedEvent)
result.TableInfo = b.queryTableInfo(msg)
result.CommitTs = msg.getCommitTs()
mysqlType := msg.getMySQLType()
result.TableInfo.TableName.IsPartition = msg.isPartition()
result.TableInfo.TableName.TableID = msg.getTableID()

mysqlType := msg.getMySQLType()
var err error
if msg.eventType() == canal.EventType_DELETE {
// for `DELETE` event, `data` contain the old data, set it as the `PreColumns`
Expand Down
12 changes: 0 additions & 12 deletions pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,6 @@ func newJSONMessageForDML(
out.RawByte('{')
out.RawString("\"commitTs\":")
out.Uint64(e.CommitTs)
out.RawByte(',')

// the logical table id
out.RawString("\"tableId\":")
out.Int64(e.TableInfo.ID)

// the physical table id
if e.TableInfo.IsPartitionTable() {
out.RawByte(',')
out.RawString("\"partitionId\":")
out.Int64(e.GetTableID())
}

// only send handle key may happen in 2 cases:
// 1. delete event, and set only handle key config. no need to encode `onlyHandleKey` field
Expand Down
188 changes: 133 additions & 55 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ func TestDMLE2E(t *testing.T) {

require.True(t, decodedEvent.IsInsert())
if enableTiDBExtension {
require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, insertEvent.GetTableID(), decodedEvent.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.ID)
require.Equal(t, insertEvent.GetCommitTs(), decodedEvent.GetCommitTs())
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName())

Expand Down Expand Up @@ -132,11 +129,6 @@ func TestDMLE2E(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)
require.True(t, decodedEvent.IsUpdate())
if enableTiDBExtension {
require.Equal(t, updateEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, updateEvent.GetTableID(), decodedEvent.GetTableID())
require.Equal(t, updateEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable())
}

err = encoder.AppendRowChangedEvent(ctx, "", deleteEvent, func() {})
require.NoError(t, err)
Expand All @@ -153,11 +145,6 @@ func TestDMLE2E(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)
require.True(t, decodedEvent.IsDelete())
if enableTiDBExtension {
require.Equal(t, deleteEvent.CommitTs, decodedEvent.CommitTs)
require.Equal(t, deleteEvent.GetTableID(), decodedEvent.GetTableID())
require.Equal(t, deleteEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable())
}
}
}

Expand Down Expand Up @@ -278,8 +265,6 @@ func TestCanalJSONClaimCheckE2E(t *testing.T) {
require.NoError(t, err, rawValue)

require.Equal(t, insertEvent.CommitTs, decodedLargeEvent.CommitTs)
require.Equal(t, insertEvent.GetTableID(), decodedLargeEvent.GetTableID())
require.Equal(t, insertEvent.TableInfo.IsPartitionTable(), decodedLargeEvent.TableInfo.IsPartitionTable())
require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedLargeEvent.TableInfo.GetSchemaName())
require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedLargeEvent.TableInfo.GetTableName())
require.Nil(t, nil, decodedLargeEvent.PreColumns)
Expand Down Expand Up @@ -547,7 +532,6 @@ func TestDDLEventWithExtension(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ddlEvent.Query, decodedDDL.Query)
require.Equal(t, ddlEvent.CommitTs, decodedDDL.CommitTs)
require.Equal(t, ddlEvent.TableInfo.IsPartitionTable(), decodedDDL.TableInfo.IsPartitionTable())
require.Equal(t, ddlEvent.TableInfo.TableName.Schema, decodedDDL.TableInfo.TableName.Schema)
require.Equal(t, ddlEvent.TableInfo.TableName.Table, decodedDDL.TableInfo.TableName.Table)
}
Expand Down Expand Up @@ -645,29 +629,26 @@ func TestMaxMessageBytes(t *testing.T) {
ctx := context.Background()
topic := ""

codecConfig := common.NewConfig(config.ProtocolCanalJSON)
for _, enableTiDBExtension := range []bool{true, false} {
codecConfig.EnableTiDBExtension = enableTiDBExtension
// the test message length is smaller than max-message-bytes
codecConfig.WithMaxMessageBytes(300)
// the test message length is smaller than max-message-bytes
maxMessageBytes := 300
codecConfig := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(maxMessageBytes)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()
builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.NoError(t, err)
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.NoError(t, err)

// the test message length is larger than max-message-bytes
codecConfig = codecConfig.WithMaxMessageBytes(100)
// the test message length is larger than max-message-bytes
codecConfig = codecConfig.WithMaxMessageBytes(100)

builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
builder, err = NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)

encoder = builder.Build()
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.Error(t, err, cerror.ErrMessageTooLarge)
}
encoder = builder.Build()
err = encoder.AppendRowChangedEvent(ctx, topic, row, nil)
require.Error(t, err, cerror.ErrMessageTooLarge)
}

func TestCanalJSONContentCompatibleE2E(t *testing.T) {
Expand Down Expand Up @@ -748,6 +729,116 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
}
}

func TestE2EPartitionTableByHash(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createTableDDLEvent := helper.DDL2Event(`CREATE TABLE t (a INT,PRIMARY KEY(a)) PARTITION BY HASH (a) PARTITIONS 5`)
require.NotNil(t, createTableDDLEvent)
insertEvent := helper.DML2Event(`insert into t values (5)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := encoder.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)
message = encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err = decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)
}

func TestE2EPartitionTableByRange(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createTableDDLEvent := helper.DDL2Event(`create table t (id int primary key, a int) PARTITION BY RANGE ( id ) (
PARTITION p0 VALUES LESS THAN (6),
PARTITION p1 VALUES LESS THAN (11),
PARTITION p2 VALUES LESS THAN (21))`)
require.NotNil(t, createTableDDLEvent)

insertEvent := helper.DML2Event(`insert into t (id) values (6)`, "test", "t", "p1")
require.NotNil(t, insertEvent)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := encoder.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)
message = encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err = decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)
}

func TestE2EPartitionTable(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down Expand Up @@ -809,13 +900,8 @@ func TestE2EPartitionTable(t *testing.T) {

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent1, nil)
require.NoError(t, err)
Expand All @@ -831,12 +917,8 @@ func TestE2EPartitionTable(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent1.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent2, nil)
require.NoError(t, err)
Expand All @@ -852,11 +934,7 @@ func TestE2EPartitionTable(t *testing.T) {
decodedEvent, err = decoder.NextRowChangedEvent()
require.NoError(t, err)

if enableTiDBExtension {
require.Equal(t, decodedEvent.GetTableID(), insertEvent2.GetTableID())
} else {
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID)
}
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID)
}
}
1 change: 0 additions & 1 deletion pkg/sink/codec/canal/canal_json_txn_event_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChang
result := new(model.RowChangedEvent)
result.TableInfo = newTableInfo(msg, nil)
result.CommitTs = msg.getCommitTs()
result.PhysicalTableID = msg.getPhysicalTableID()

mysqlType := msg.getMySQLType()
var err error
Expand Down
Loading

0 comments on commit fa5baad

Please sign in to comment.