Skip to content

Commit

Permalink
filter(ticdc): fix incorrect event filter with "rename" DDLs (#11956) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 6, 2025
1 parent e7de6be commit f7cbd8d
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 8 deletions.
14 changes: 13 additions & 1 deletion pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,21 @@ func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (skip bool, err erro
if len(f.rules) == 0 {
return false, nil
}
evenType := ddlToEventType(ddl.Type)
schema := ddl.TableInfo.TableName.Schema
table := ddl.TableInfo.TableName.Table
evenType := ddlToEventType(ddl.Type)
if evenType == bf.RenameTable {
if ddl.PreTableInfo == nil {
log.Warn("sql event filter doesn't find old table info when the event type is `rename table`",
zap.String("schema", schema),
zap.String("table", table),
zap.Stringer("type", ddl.Type),
zap.String("query", ddl.Query))
return true, cerror.ErrTableIneligible.GenWithStackByArgs(ddl.TableInfo.TableName)
}
schema = ddl.PreTableInfo.TableName.Schema
table = ddl.PreTableInfo.TableName.Table
}
if evenType == bf.NullEvent {
log.Warn("sql event filter unsupported ddl type, do nothing",
zap.String("type", ddl.Type.String()),
Expand Down
80 changes: 75 additions & 5 deletions pkg/filter/sql_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ import (
func TestShouldSkipDDL(t *testing.T) {
t.Parallel()
type innerCase struct {
schema string
table string
query string
ddlType timodel.ActionType
skip bool
schema string
table string
preSchema string
preTable string
query string
ddlType timodel.ActionType
skip bool
}

type testCase struct {
Expand Down Expand Up @@ -286,6 +288,74 @@ func TestShouldSkipDDL(t *testing.T) {
Type: c.ddlType,
}
skip, err := f.shouldSkipDDL(ddl)
if c.ddlType == timodel.ActionRenameTable || c.ddlType == timodel.ActionRenameTables {
require.ErrorIs(t, err, cerror.ErrTableIneligible)
require.Equal(t, true, skip, "case: %+v", c)
} else {
require.NoError(t, err)
require.Equal(t, c.skip, skip, "case: %+v", c)
}
}
case7 := testCase{
cfg: &config.FilterConfig{
Rules: []string{"*.t1", "*.t2"},
EventFilters: []*config.EventFilterRule{
{
Matcher: []string{"*.t1"},
IgnoreEvent: allEventTypes,
},
},
},
cases: []innerCase{
{
schema: "test",
table: "t2",
preSchema: "test",
preTable: "t1",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t3",
preSchema: "test",
preTable: "t1",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t1",
preSchema: "test",
preTable: "t2",
query: "no matter",
ddlType: timodel.ActionRenameTable,
skip: false,
},
},
}
f, err = newSQLEventFilter(case7.cfg)
require.NoError(t, err)
for _, c := range case7.cases {
ddl := &model.DDLEvent{
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: c.schema,
Table: c.table,
},
},
PreTableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: c.preSchema,
Table: c.preTable,
},
},
Query: c.query,
Type: c.ddlType,
}
skip, err := f.shouldSkipDDL(ddl)
require.NoError(t, err)
require.Equal(t, c.skip, skip, "case: %+v", c)
}
Expand Down
4 changes: 4 additions & 0 deletions tests/integration_tests/event_filter/conf/cf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ ignore-event = ["truncate table"]
[[filter.event-filters]]
matcher = ["event_filter.t_alter"]
ignore-event = ["alter table"]

[[filter.event-filters]]
matcher = ["event_filter.t_name*"]
ignore-event = ["rename table"]
2 changes: 1 addition & 1 deletion tests/integration_tests/event_filter/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter"]
target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter", "event_filter.t_name*", "event_filter.t_rename*"]

[data-sources]
[data-sources.tidb0]
Expand Down
23 changes: 22 additions & 1 deletion tests/integration_tests/event_filter/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,25 @@ CREATE TABLE t_alter
) ENGINE = InnoDB
DEFAULT CHARSET = utf8
COLLATE = utf8_bin;



CREATE TABLE t_name (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name1 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name2 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name3 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
27 changes: 27 additions & 0 deletions tests/integration_tests/event_filter/data/test_rename.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
USE `event_filter`;

RENAME TABLE t_name TO t_rename;

INSERT INTO t_rename
VALUES (1, 'guagua');

INSERT INTO t_rename
VALUES (2, 'huahua');

INSERT INTO t_rename
VALUES (3, 'xigua');

INSERT INTO t_rename
VALUES (4, 'yuko');

-- rename tables
RENAME TABLE t_name1 TO t_rename1, t_name2 TO t_rename2, t_name3 TO t_rename3;

INSERT INTO t_rename1
VALUES (1, 'guagua');

INSERT INTO t_rename2
VALUES (2, 'huahua');

INSERT INTO t_rename3
VALUES (3, 'xigua');
9 changes: 9 additions & 0 deletions tests/integration_tests/event_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ function run() {
check_table_exists "event_filter.t_normal" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_truncate" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_alter" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# check those rows that are not filtered are synced to downstream
run_sql "select count(1) from event_filter.t1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -61,6 +65,11 @@ function run() {
run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE event_filter.t_alter MODIFY t_bigint BIGINT;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_alter.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name TO event_filter.t_rename;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name1 TO event_filter.t_rename1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name2 TO event_filter.t_rename2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name3 TO event_filter.t_rename3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_rename.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "create table event_filter.finish_mark(id int primary key);" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

Expand Down

0 comments on commit f7cbd8d

Please sign in to comment.