Skip to content

Commit

Permalink
dm: add system table filter default for dump task (#11985)
Browse files Browse the repository at this point in the history
close #11984
  • Loading branch information
River2000i authored Jan 9, 2025
1 parent 8830dc8 commit dec6591
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 24 deletions.
18 changes: 15 additions & 3 deletions dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,21 @@ func (m *Dumpling) constructArgs(ctx context.Context) (*export.Config, error) {
dumpConfig.Password = db.Password
dumpConfig.OutputDirPath = cfg.Dir // use LoaderConfig.Dir as output dir
dumpConfig.CollationCompatible = cfg.CollationCompatible
tableFilter, err := filter.ParseMySQLReplicationRules(cfg.BAList)
if err != nil {
return nil, err
var (
tableFilter filter.Filter
err error
)
if cfg.BAList == nil {
// If no block-allow-list is set, system tables are filtered by default.
tableFilter, err = filter.Parse(dutils.DefaultTableFilter)
if err != nil {
return nil, err
}
} else {
tableFilter, err = filter.ParseMySQLReplicationRules(cfg.BAList)
if err != nil {
return nil, err
}
}
dumpConfig.TableFilter = tableFilter
dumpConfig.CompleteInsert = true // always keep column name in `INSERT INTO` statements.
Expand Down
49 changes: 49 additions & 0 deletions dm/dumpling/dumpling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/dm/config"
"github.com/pingcap/tiflow/dm/pb"
"github.com/pingcap/tiflow/dm/pkg/conn"
dutils "github.com/pingcap/tiflow/dm/pkg/dumpling"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -235,3 +236,51 @@ func genDumpCfg(t *testing.T) *config.SubTaskConfig {
},
}
}

func TestBAlist(t *testing.T) {
ctx := context.Background()

// case sensitive and set block-allow-list
cfg := genDumpCfg(t)
cfg.CaseSensitive = true
m := NewDumpling(cfg)
err := m.Init(ctx)
require.NoError(t, err)
tableFilter, err := tfilter.ParseMySQLReplicationRules(cfg.BAList)
require.NoError(t, err)
require.Equal(t, tableFilter, m.dumpConfig.TableFilter)

// case insensitive and set block-allow-list
cfg = genDumpCfg(t)
cfg.CaseSensitive = false
m = NewDumpling(cfg)
err = m.Init(ctx)
require.NoError(t, err)
tableFilter, err = tfilter.ParseMySQLReplicationRules(cfg.BAList)
require.NoError(t, err)
tableFilter = tfilter.CaseInsensitive(tableFilter)
require.Equal(t, tableFilter, m.dumpConfig.TableFilter)

// case sensitive and not set block-allow-list
cfg = genDumpCfg(t)
cfg.BAList = nil
cfg.CaseSensitive = true
m = NewDumpling(cfg)
err = m.Init(ctx)
require.NoError(t, err)
tableFilter, err = tfilter.Parse(dutils.DefaultTableFilter)
require.NoError(t, err)
require.Equal(t, tableFilter, m.dumpConfig.TableFilter)

// case insensitive and not set block-allow-list
cfg = genDumpCfg(t)
cfg.BAList = nil
cfg.CaseSensitive = false
m = NewDumpling(cfg)
err = m.Init(ctx)
require.NoError(t, err)
tableFilter, err = tfilter.Parse(dutils.DefaultTableFilter)
require.NoError(t, err)
tableFilter = tfilter.CaseInsensitive(tableFilter)
require.Equal(t, tableFilter, m.dumpConfig.TableFilter)
}
8 changes: 5 additions & 3 deletions dm/pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
)

// DefaultTableFilter is the default table filter for dumpling.
var DefaultTableFilter = []string{"*.*", export.DefaultTableFilter}
// Different with Dumpling, dm's case sensitivity is determined by the `lower_case_table_names` parameter from upstream,
// so filter both uppercase and lowercase tables.
var DefaultTableFilter = []string{"*.*", export.DefaultTableFilter, "!/^(information_schema|performance_schema|metrics_schema|inspection_schema)$/.*"}

// ParseMetaData parses mydumper's output meta file and returns binlog location.
// since v2.0.0, dumpling maybe configured to output master status after connection pool is established,
Expand Down Expand Up @@ -252,7 +254,7 @@ func ParseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e
dumplingFlagSet.Uint64VarP(&dumpCfg.Rows, "rows", "r", dumpCfg.Rows, "Split table into chunks of this many rows, default unlimited")
dumplingFlagSet.StringVar(&dumpCfg.Where, "where", dumpCfg.Where, "Dump only selected records")
dumplingFlagSet.BoolVar(&dumpCfg.EscapeBackslash, "escape-backslash", dumpCfg.EscapeBackslash, "Use backslash to escape quotation marks")
dumplingFlagSet.StringArrayVarP(&filters, "filter", "f", DefaultTableFilter, "Filter to select which tables to dump")
dumplingFlagSet.StringArrayVarP(&filters, "filter", "f", []string{"*.*", export.DefaultTableFilter}, "Filter to select which tables to dump")
dumplingFlagSet.StringVar(&dumpCfg.Security.CAPath, "ca", dumpCfg.Security.CAPath, "The path name to the certificate authority file for TLS connection")
dumplingFlagSet.StringVar(&dumpCfg.Security.CertPath, "cert", dumpCfg.Security.CertPath, "The path name to the client certificate file for TLS connection")
dumplingFlagSet.StringVar(&dumpCfg.Security.KeyPath, "key", dumpCfg.Security.KeyPath, "The path name to the client private key file for TLS connection")
Expand Down Expand Up @@ -282,7 +284,7 @@ func ParseExtraArgs(logger *log.Logger, dumpCfg *export.Config, args []string) e
}
}

if len(tablesList) > 0 || !utils.NonRepeatStringsEqual(DefaultTableFilter, filters) {
if len(tablesList) > 0 || !utils.NonRepeatStringsEqual([]string{"*.*", export.DefaultTableFilter}, filters) {
ff, err2 := export.ParseTableFilter(tablesList, filters)
if err2 != nil {
return err2
Expand Down
37 changes: 35 additions & 2 deletions dm/tests/openapi/client/openapi_task_check
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,9 @@ def create_shard_task_success():
print("create_shard_task_success resp=", resp.json())
assert resp.status_code == 201

def create_dump_task_success():
def create_dump_task_success(task_name):
task = {
"name": "test-dump",
"name": task_name,
"task_mode": "dump",
"meta_schema": "dm-meta",
"enhance_online_schema_change": True,
Expand Down Expand Up @@ -485,6 +485,38 @@ def create_dump_task_success():
print("create_dump_task_success resp=", resp.json())
assert resp.status_code == 201


def create_dump_task_without_table_filter_success(task_name):
task = {
"name": task_name,
"task_mode": "dump",
"meta_schema": "dm-meta",
"enhance_online_schema_change": True,
"on_duplicate": "error",
"table_migrate_rule": [],
"target_config": {
"host": "127.0.0.1",
"port": 4000,
"user": "root",
"password": "",
},
"source_config": {
"full_migrate_conf": {
"export_threads": 4,
"import_threads": 16,
"data_dir": "./exported_data_1",
"consistency": "auto",
},
"incr_migrate_conf": {"repl_threads": 16, "repl_batch": 100},
"source_conf": [
{"source_name": SOURCE1_NAME},
],
}
}
resp = requests.post(url=API_ENDPOINT, json={"task": task})
print("create_dump_task_without_table_filter_success resp=", resp.json())
assert resp.status_code == 201

def create_load_task_success():
task = {
"name": LOAD_TASK_NAME,
Expand Down Expand Up @@ -1039,6 +1071,7 @@ if __name__ == "__main__":
"create_noshard_task_success": create_noshard_task_success,
"create_shard_task_success": create_shard_task_success,
"create_dump_task_success": create_dump_task_success,
"create_dump_task_without_table_filter_success": create_dump_task_without_table_filter_success,
"create_load_task_success": create_load_task_success,
"create_incremental_task_with_gtid_success": create_incremental_task_with_gtid_success,
"delete_task_failed": delete_task_failed,
Expand Down
41 changes: 25 additions & 16 deletions dm/tests/openapi/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ function test_dump_and_load_task() {
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: dump & load TASK"
prepare_database

task_name_dump="test-dump"
task_name_load="test-load"

# create source successfully
openapi_source_check "create_source1_success"
# get source list success
Expand All @@ -208,37 +205,49 @@ function test_dump_and_load_task() {
# create task success: not valid task create request
openapi_task_check "create_task_failed"

# create dump task success
openapi_task_check "create_dump_task_success"
init_dump_data

dump_task_name="test-dump-1"
# create dump task from mysql without table filter success and valid stage is "Stopped"
openapi_task_check "create_dump_task_without_table_filter_success" $dump_task_name
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $task_name_dump" \
"query-status $dump_task_name" \
"\"stage\": \"Stopped\"" 1
# start dump task success and wait dump task finish and delete dump task
openapi_task_check "start_task_success" $dump_task_name ""
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $dump_task_name" 100 \
"\"stage\": \"Finished\"" 1

init_dump_data

# start dump task success
openapi_task_check "start_task_success" $task_name_dump ""
dump_task_name="test-dump-2"
# create dump task success and valid stage is "Stopped"
openapi_task_check "create_dump_task_success" $dump_task_name
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $dump_task_name" \
"\"stage\": \"Stopped\"" 1

# wait dump task finish
# start dump task success and wait dump task finish
openapi_task_check "start_task_success" $dump_task_name ""
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $task_name_dump" 100 \
"query-status $dump_task_name" 100 \
"\"stage\": \"Finished\"" 1
openapi_task_check "check_dump_task_finished_status_success" $task_name_dump 2 2 4 4 228
openapi_task_check "check_dump_task_finished_status_success" $dump_task_name 2 2 4 4 228

load_task_name="test-load"
# create load task success
openapi_task_check "create_load_task_success"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $task_name_load" \
"query-status $load_task_name" \
"\"stage\": \"Stopped\"" 1

# use the data from the same dir of dump task

# start load task success
openapi_task_check "start_task_success" $task_name_load ""
openapi_task_check "start_task_success" $load_task_name ""

# wait load task finish
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status $task_name_load" 100 \
"query-status $load_task_name" 100 \
"\"stage\": \"Finished\"" 1

check_sync_diff $WORK_DIR $cur/conf/diff_config_no_shard_one_source.toml
Expand Down

0 comments on commit dec6591

Please sign in to comment.