Skip to content

Commit

Permalink
add one ut to cover hash partition table scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 14, 2025
1 parent fda05d0 commit 3edb7e1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
16 changes: 16 additions & 0 deletions pkg/sink/codec/canal/canal_json_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,22 @@ func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physical
return fmt.Errorf("cannot found partition for column value %s", columnValue)
// todo: support following rule if meet the corresponding workload
case pmodel.PartitionTypeHash:
targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", ""))
columns := event.Columns
if columns == nil {
columns = event.PreColumns
}
var columnValue int64
for _, col := range columns {
if col.ColumnID == targetColumnID {
columnValue = col.Value.(int64)
break
}
}
result := columnValue % int64(len(event.TableInfo.Partition.Definitions))
partitionID := event.TableInfo.GetPartitionInfo().Definitions[result].ID
event.PhysicalTableID = partitionID
return nil
case pmodel.PartitionTypeKey:
case pmodel.PartitionTypeList:
case pmodel.PartitionTypeNone:
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/codec/canal/canal_json_row_event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,59 @@ func TestCanalJSONContentCompatibleE2E(t *testing.T) {
}
}

func TestE2EPartitionTableByHash(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

createTableDDLEvent := helper.DDL2Event(`CREATE TABLE t (a INT,PRIMARY KEY(a)) PARTITION BY HASH (a) PARTITIONS 5`)
require.NotNil(t, createTableDDLEvent)
insertEvent := helper.DML2Event(`insert into t values (5)`, "test", "t", "p0")
require.NotNil(t, insertEvent)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolCanalJSON)

builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig)
require.NoError(t, err)
encoder := builder.Build()

decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

message, err := encoder.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

tp, hasNext, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, tp)

decodedDDL, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedDDL)

err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil)
require.NoError(t, err)
message = encoder.Build()[0]

err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)
tp, hasNext, err = decoder.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeRow, tp)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotZero(t, decodedEvent.GetTableID())
require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID)
}

func TestE2EPartitionTableByRange(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()
Expand Down

0 comments on commit 3edb7e1

Please sign in to comment.