Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter(ticdc): fix incorrect event filter with "rename" DDLs (#11956) #11973

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ func (f *sqlEventFilter) shouldSkipDDL(ddl *model.DDLEvent) (bool, error) {
if err != nil {
return false, err
}
if evenType == bf.RenameTable {
if ddl.PreTableInfo == nil {
log.Warn("sql event filter doesn't find old table info when the event type is `rename table`",
zap.String("schema", schema),
zap.String("table", table),
zap.Stringer("type", ddl.Type),
zap.String("query", ddl.Query))
return true, cerror.ErrTableIneligible.GenWithStackByArgs(ddl.TableInfo.TableName)
}
schema = ddl.PreTableInfo.TableName.Schema
table = ddl.PreTableInfo.TableName.Table
}
if evenType == bf.NullEvent {
log.Warn("sql event filter unsupported ddl type, do nothing",
zap.String("type", ddl.Type.String()),
Expand Down
166 changes: 115 additions & 51 deletions pkg/filter/sql_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -27,10 +28,13 @@ import (
func TestShouldSkipDDL(t *testing.T) {
t.Parallel()
type innerCase struct {
schema string
table string
query string
skip bool
schema string
table string
preSchema string
preTable string
query string
ddlType timodel.ActionType
skip bool
}

type testCase struct {
Expand All @@ -51,28 +55,32 @@ func TestShouldSkipDDL(t *testing.T) {
},
cases: []innerCase{
{
schema: "test",
table: "t1",
query: "alter table t1 modify column age int",
skip: true,
schema: "test",
table: "t1",
query: "alter table t1 modify column age int",
ddlType: timodel.ActionModifyColumn,
skip: true,
},
{
schema: "test",
table: "t1",
query: "create table t1(id int primary key)",
skip: true,
schema: "test",
table: "t1",
query: "create table t1(id int primary key)",
ddlType: timodel.ActionCreateTable,
skip: true,
},
{
schema: "test",
table: "t2", // table name not match
query: "alter table t2 modify column age int",
skip: false,
schema: "test",
table: "t2", // table name not match
query: "alter table t2 modify column age int",
ddlType: timodel.ActionModifyColumn,
skip: false,
},
{
schema: "test2", // schema name not match
table: "t1",
query: "alter table t1 modify column age int",
skip: false,
schema: "test2", // schema name not match
table: "t1",
query: "alter table t1 modify column age int",
ddlType: timodel.ActionModifyColumn,
skip: false,
},
},
},
Expand All @@ -87,28 +95,32 @@ func TestShouldSkipDDL(t *testing.T) {
},
cases: []innerCase{
{
schema: "test",
table: "t1",
query: "alter table t1 modify column age int",
skip: false,
schema: "test",
table: "t1",
query: "alter table t1 modify column age int",
ddlType: timodel.ActionModifyColumn,
skip: false,
},
{
schema: "test",
table: "t1",
query: "alter table t1 drop column age",
skip: false,
schema: "test",
table: "t1",
query: "alter table t1 drop column age",
ddlType: timodel.ActionModifyColumn,
skip: false,
},
{
schema: "test2",
table: "t1",
query: "drop database test2",
skip: true,
schema: "test2",
table: "t1",
query: "drop database test2",
ddlType: timodel.ActionDropSchema,
skip: true,
},
{
schema: "test3",
table: "t1",
query: "drop index i3 on t1",
skip: false,
schema: "test3",
table: "t1",
query: "drop index i3 on t1",
ddlType: timodel.ActionDropIndex,
skip: false,
},
},
},
Expand All @@ -123,27 +135,31 @@ func TestShouldSkipDDL(t *testing.T) {
},
cases: []innerCase{
{
schema: "test",
table: "t1",
query: "ALTER TABLE t1 MODIFY COLUMN age int(11) NOT NULL",
skip: true,
schema: "test",
table: "t1",
query: "ALTER TABLE t1 MODIFY COLUMN age int(11) NOT NULL",
ddlType: timodel.ActionModifyColumn,
skip: true,
},
{
schema: "test",
table: "t1",
query: "ALTER TABLE t1 DROP COLUMN age",
skip: true,
schema: "test",
table: "t1",
query: "ALTER TABLE t1 DROP COLUMN age",
ddlType: timodel.ActionDropColumn,
skip: true,
},
{ // no table name
schema: "test2",
query: "DROP DATABASE test",
skip: true,
schema: "test2",
query: "DROP DATABASE test",
ddlType: timodel.ActionDropSchema,
skip: true,
},
{
schema: "test3",
table: "t1",
query: "Drop Index i1 on test3.t1",
skip: false,
schema: "test3",
table: "t1",
query: "Drop Index i1 on test3.t1",
ddlType: timodel.ActionDropIndex,
skip: false,
},
},
},
Expand All @@ -168,6 +184,47 @@ func TestShouldSkipDDL(t *testing.T) {
},
},
},
// rename table
{
cfg: &config.FilterConfig{
Rules: []string{"*.t1", "*.t2"},
EventFilters: []*config.EventFilterRule{
{
Matcher: []string{"*.t1"},
IgnoreEvent: []bf.EventType{bf.AllDDL},
},
},
},
cases: []innerCase{
{
schema: "test",
table: "t2",
preSchema: "test",
preTable: "t1",
query: "RENAME TABLE t1 to t2",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t3",
preSchema: "test",
preTable: "t1",
query: "RENAME TABLE t1 to t3",
ddlType: timodel.ActionRenameTable,
skip: true,
},
{
schema: "test",
table: "t1",
preSchema: "test",
preTable: "t2",
query: "RENAME TABLE t2 to t1",
ddlType: timodel.ActionRenameTable,
skip: false,
},
},
},
}

for _, tc := range testCases {
Expand All @@ -181,7 +238,14 @@ func TestShouldSkipDDL(t *testing.T) {
Table: c.table,
},
},
PreTableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: c.preSchema,
Table: c.preTable,
},
},
Query: c.query,
Type: c.ddlType,
}
skip, err := f.shouldSkipDDL(ddl)
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions tests/integration_tests/event_filter/conf/cf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ ignore-event = ["truncate table"]
[[filter.event-filters]]
matcher = ["event_filter.t_alter"]
ignore-event = ["alter table"]

[[filter.event-filters]]
matcher = ["event_filter.t_name*"]
ignore-event = ["rename table"]
2 changes: 1 addition & 1 deletion tests/integration_tests/event_filter/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source-instances = ["tidb0"]

target-instance = "mysql1"

target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter"]
target-check-tables = ["event_filter.t_normal", "event_filter.t_truncate", "event_filter.t_alter", "event_filter.t_name*", "event_filter.t_rename*"]

[data-sources]
[data-sources.tidb0]
Expand Down
23 changes: 22 additions & 1 deletion tests/integration_tests/event_filter/data/test.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,25 @@ CREATE TABLE t_alter
) ENGINE = InnoDB
DEFAULT CHARSET = utf8
COLLATE = utf8_bin;



CREATE TABLE t_name (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name1 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name2 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
CREATE TABLE t_name3 (
id INT,
name varchar(128),
PRIMARY KEY (id)
);
27 changes: 27 additions & 0 deletions tests/integration_tests/event_filter/data/test_rename.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
USE `event_filter`;

RENAME TABLE t_name TO t_rename;

INSERT INTO t_rename
VALUES (1, 'guagua');

INSERT INTO t_rename
VALUES (2, 'huahua');

INSERT INTO t_rename
VALUES (3, 'xigua');

INSERT INTO t_rename
VALUES (4, 'yuko');

-- rename tables
RENAME TABLE t_name1 TO t_rename1, t_name2 TO t_rename2, t_name3 TO t_rename3;

INSERT INTO t_rename1
VALUES (1, 'guagua');

INSERT INTO t_rename2
VALUES (2, 'huahua');

INSERT INTO t_rename3
VALUES (3, 'xigua');
9 changes: 9 additions & 0 deletions tests/integration_tests/event_filter/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ function run() {
check_table_exists "event_filter.t_normal" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_truncate" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_alter" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.t_name3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

# check those rows that are not filtered are synced to downstream
run_sql "select count(1) from event_filter.t1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
Expand All @@ -54,6 +58,11 @@ function run() {
run_sql_file $CUR/data/test_truncate.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE event_filter.t_alter MODIFY t_bigint BIGINT;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_alter.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name TO event_filter.t_rename;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name1 TO event_filter.t_rename1;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name2 TO event_filter.t_rename2;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql "RENAME TABLE event_filter.t_name3 TO event_filter.t_rename3;" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
run_sql_file $CUR/data/test_rename.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "create table event_filter.finish_mark(id int primary key);" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists "event_filter.finish_mark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

Expand Down
Loading