From dec6591c20d2f2fa801a8aaffc278eda795fc781 Mon Sep 17 00:00:00 2001 From: Jiaqiang Huang Date: Thu, 9 Jan 2025 13:24:43 +0800 Subject: [PATCH] dm: add system table filter default for dump task (#11985) close pingcap/tiflow#11984 --- dm/dumpling/dumpling.go | 18 ++++++-- dm/dumpling/dumpling_test.go | 49 ++++++++++++++++++++++ dm/pkg/dumpling/utils.go | 8 ++-- dm/tests/openapi/client/openapi_task_check | 37 +++++++++++++++- dm/tests/openapi/run.sh | 41 +++++++++++------- 5 files changed, 129 insertions(+), 24 deletions(-) diff --git a/dm/dumpling/dumpling.go b/dm/dumpling/dumpling.go index 22ab66a1228..5d8540a5af9 100644 --- a/dm/dumpling/dumpling.go +++ b/dm/dumpling/dumpling.go @@ -335,9 +335,21 @@ func (m *Dumpling) constructArgs(ctx context.Context) (*export.Config, error) { dumpConfig.Password = db.Password dumpConfig.OutputDirPath = cfg.Dir // use LoaderConfig.Dir as output dir dumpConfig.CollationCompatible = cfg.CollationCompatible - tableFilter, err := filter.ParseMySQLReplicationRules(cfg.BAList) - if err != nil { - return nil, err + var ( + tableFilter filter.Filter + err error + ) + if cfg.BAList == nil { + // If no block-allow-list is set, system tables are filtered by default. + tableFilter, err = filter.Parse(dutils.DefaultTableFilter) + if err != nil { + return nil, err + } + } else { + tableFilter, err = filter.ParseMySQLReplicationRules(cfg.BAList) + if err != nil { + return nil, err + } } dumpConfig.TableFilter = tableFilter dumpConfig.CompleteInsert = true // always keep column name in `INSERT INTO` statements. diff --git a/dm/dumpling/dumpling_test.go b/dm/dumpling/dumpling_test.go index 5aaab26c7b9..52d40367d79 100644 --- a/dm/dumpling/dumpling_test.go +++ b/dm/dumpling/dumpling_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/dm/config" "github.com/pingcap/tiflow/dm/pb" "github.com/pingcap/tiflow/dm/pkg/conn" + dutils "github.com/pingcap/tiflow/dm/pkg/dumpling" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/engine/pkg/promutil" "github.com/prometheus/client_golang/prometheus" @@ -235,3 +236,51 @@ func genDumpCfg(t *testing.T) *config.SubTaskConfig { }, } } + +func TestBAlist(t *testing.T) { + ctx := context.Background() + + // case sensitive and set block-allow-list + cfg := genDumpCfg(t) + cfg.CaseSensitive = true + m := NewDumpling(cfg) + err := m.Init(ctx) + require.NoError(t, err) + tableFilter, err := tfilter.ParseMySQLReplicationRules(cfg.BAList) + require.NoError(t, err) + require.Equal(t, tableFilter, m.dumpConfig.TableFilter) + + // case insensitive and set block-allow-list + cfg = genDumpCfg(t) + cfg.CaseSensitive = false + m = NewDumpling(cfg) + err = m.Init(ctx) + require.NoError(t, err) + tableFilter, err = tfilter.ParseMySQLReplicationRules(cfg.BAList) + require.NoError(t, err) + tableFilter = tfilter.CaseInsensitive(tableFilter) + require.Equal(t, tableFilter, m.dumpConfig.TableFilter) + + // case sensitive and not set block-allow-list + cfg = genDumpCfg(t) + cfg.BAList = nil + cfg.CaseSensitive = true + m = NewDumpling(cfg) + err = m.Init(ctx) + require.NoError(t, err) + tableFilter, err = tfilter.Parse(dutils.DefaultTableFilter) + require.NoError(t, err) + require.Equal(t, tableFilter, m.dumpConfig.TableFilter) + + // case insensitive and not set block-allow-list + cfg = genDumpCfg(t) + cfg.BAList = nil + cfg.CaseSensitive = false + m = NewDumpling(cfg) + err = m.Init(ctx) + require.NoError(t, err) + tableFilter, err = tfilter.Parse(dutils.DefaultTableFilter) + require.NoError(t, err) + tableFilter = tfilter.CaseInsensitive(tableFilter) + require.Equal(t, tableFilter, m.dumpConfig.TableFilter) +} diff --git a/dm/pkg/dumpling/utils.go b/dm/pkg/dumpling/utils.go index 01efb4b4f8d..654c9a59afe 100644 --- a/dm/pkg/dumpling/utils.go +++ b/dm/pkg/dumpling/utils.go @@ -35,7 +35,9 @@ import ( ) // DefaultTableFilter is the default table filter for dumpling. -var DefaultTableFilter = []string{"*.*", export.DefaultTableFilter} +// Different with Dumpling, dm's case sensitivity is determined by the `lower_case_table_names` parameter from upstream, +// so filter both uppercase and lowercase tables. +var DefaultTableFilter = []string{"*.*", export.DefaultTableFilter, "!/^(information_schema|performance_schema|metrics_schema|inspection_schema)$/.*"} // ParseMetaData parses mydumper's output meta file and returns binlog location. // since v2.0.0, dumpling maybe configured to output master status after connection pool is established, @@ -252,7 +254,7 @@ func ParseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e dumplingFlagSet.Uint64VarP(&dumpCfg.Rows, "rows", "r", dumpCfg.Rows, "Split table into chunks of this many rows, default unlimited") dumplingFlagSet.StringVar(&dumpCfg.Where, "where", dumpCfg.Where, "Dump only selected records") dumplingFlagSet.BoolVar(&dumpCfg.EscapeBackslash, "escape-backslash", dumpCfg.EscapeBackslash, "Use backslash to escape quotation marks") - dumplingFlagSet.StringArrayVarP(&filters, "filter", "f", DefaultTableFilter, "Filter to select which tables to dump") + dumplingFlagSet.StringArrayVarP(&filters, "filter", "f", []string{"*.*", export.DefaultTableFilter}, "Filter to select which tables to dump") dumplingFlagSet.StringVar(&dumpCfg.Security.CAPath, "ca", dumpCfg.Security.CAPath, "The path name to the certificate authority file for TLS connection") dumplingFlagSet.StringVar(&dumpCfg.Security.CertPath, "cert", dumpCfg.Security.CertPath, "The path name to the client certificate file for TLS connection") dumplingFlagSet.StringVar(&dumpCfg.Security.KeyPath, "key", dumpCfg.Security.KeyPath, "The path name to the client private key file for TLS connection") @@ -282,7 +284,7 @@ func ParseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e } } - if len(tablesList) > 0 || !utils.NonRepeatStringsEqual(DefaultTableFilter, filters) { + if len(tablesList) > 0 || !utils.NonRepeatStringsEqual([]string{"*.*", export.DefaultTableFilter}, filters) { ff, err2 := export.ParseTableFilter(tablesList, filters) if err2 != nil { return err2 diff --git a/dm/tests/openapi/client/openapi_task_check b/dm/tests/openapi/client/openapi_task_check index c8c9b54cafe..0f7e3631c59 100755 --- a/dm/tests/openapi/client/openapi_task_check +++ b/dm/tests/openapi/client/openapi_task_check @@ -445,9 +445,9 @@ def create_shard_task_success(): print("create_shard_task_success resp=", resp.json()) assert resp.status_code == 201 -def create_dump_task_success(): +def create_dump_task_success(task_name): task = { - "name": "test-dump", + "name": task_name, "task_mode": "dump", "meta_schema": "dm-meta", "enhance_online_schema_change": True, @@ -485,6 +485,38 @@ def create_dump_task_success(): print("create_dump_task_success resp=", resp.json()) assert resp.status_code == 201 + +def create_dump_task_without_table_filter_success(task_name): + task = { + "name": task_name, + "task_mode": "dump", + "meta_schema": "dm-meta", + "enhance_online_schema_change": True, + "on_duplicate": "error", + "table_migrate_rule": [], + "target_config": { + "host": "127.0.0.1", + "port": 4000, + "user": "root", + "password": "", + }, + "source_config": { + "full_migrate_conf": { + "export_threads": 4, + "import_threads": 16, + "data_dir": "./exported_data_1", + "consistency": "auto", + }, + "incr_migrate_conf": {"repl_threads": 16, "repl_batch": 100}, + "source_conf": [ + {"source_name": SOURCE1_NAME}, + ], + } + } + resp = requests.post(url=API_ENDPOINT, json={"task": task}) + print("create_dump_task_without_table_filter_success resp=", resp.json()) + assert resp.status_code == 201 + def create_load_task_success(): task = { "name": LOAD_TASK_NAME, @@ -1039,6 +1071,7 @@ if __name__ == "__main__": "create_noshard_task_success": create_noshard_task_success, "create_shard_task_success": create_shard_task_success, "create_dump_task_success": create_dump_task_success, + "create_dump_task_without_table_filter_success": create_dump_task_without_table_filter_success, "create_load_task_success": create_load_task_success, "create_incremental_task_with_gtid_success": create_incremental_task_with_gtid_success, "delete_task_failed": delete_task_failed, diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 83d6d6e9c6f..f231a000fbb 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -189,9 +189,6 @@ function test_dump_and_load_task() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump & load TASK" prepare_database - task_name_dump="test-dump" - task_name_load="test-load" - # create source successfully openapi_source_check "create_source1_success" # get source list success @@ -208,37 +205,49 @@ function test_dump_and_load_task() { # create task success: not valid task create request openapi_task_check "create_task_failed" - # create dump task success - openapi_task_check "create_dump_task_success" + init_dump_data + + dump_task_name="test-dump-1" + # create dump task from mysql without table filter success and valid stage is "Stopped" + openapi_task_check "create_dump_task_without_table_filter_success" $dump_task_name run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name_dump" \ + "query-status $dump_task_name" \ "\"stage\": \"Stopped\"" 1 + # start dump task success and wait dump task finish and delete dump task + openapi_task_check "start_task_success" $dump_task_name "" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $dump_task_name" 100 \ + "\"stage\": \"Finished\"" 1 - init_dump_data - - # start dump task success - openapi_task_check "start_task_success" $task_name_dump "" + dump_task_name="test-dump-2" + # create dump task success and valid stage is "Stopped" + openapi_task_check "create_dump_task_success" $dump_task_name + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $dump_task_name" \ + "\"stage\": \"Stopped\"" 1 - # wait dump task finish + # start dump task success and wait dump task finish + openapi_task_check "start_task_success" $dump_task_name "" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name_dump" 100 \ + "query-status $dump_task_name" 100 \ "\"stage\": \"Finished\"" 1 - openapi_task_check "check_dump_task_finished_status_success" $task_name_dump 2 2 4 4 228 + openapi_task_check "check_dump_task_finished_status_success" $dump_task_name 2 2 4 4 228 + load_task_name="test-load" # create load task success openapi_task_check "create_load_task_success" run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name_load" \ + "query-status $load_task_name" \ "\"stage\": \"Stopped\"" 1 # use the data from the same dir of dump task # start load task success - openapi_task_check "start_task_success" $task_name_load "" + openapi_task_check "start_task_success" $load_task_name "" # wait load task finish run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status $task_name_load" 100 \ + "query-status $load_task_name" 100 \ "\"stage\": \"Finished\"" 1 check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard_one_source.toml