diff --git a/.github/workflows/dm_binlog_999999.yaml b/.github/workflows/dm_binlog_999999.yaml index 4e4da15f755..be8f3373780 100644 --- a/.github/workflows/dm_binlog_999999.yaml +++ b/.github/workflows/dm_binlog_999999.yaml @@ -64,8 +64,6 @@ jobs: sudo apt-get update sudo apt-get install -y curl sudo docker-compose -f ./dm/tests/binlog_999999/docker-compose.yml up -d - curl http://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz - mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/ curl http://download.pingcap.org/tidb-nightly-linux-amd64.tar.gz | tar xz mv tidb-nightly-linux-amd64/bin/tidb-server bin/ curl -O https://dl.min.io/server/minio/release/linux-amd64/minio diff --git a/Makefile b/Makefile index 63ce12fbfaf..b8f93fe04bd 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ mysql_docker_integration_test mysql_docker_integration_test_with_build \ build_mysql_integration_test_images clean_integration_test_images \ dm dm-master dm-worker dmctl dm-syncer dm_coverage \ + sync-diff-inspector \ engine tiflow tiflow-demo tiflow-chaos-case engine_image help \ format-makefiles check-makefiles oauth2_server prepare_test_binaries @@ -219,14 +220,13 @@ check_third_party_binary: @which bin/pd-server @which bin/tiflash @which bin/pd-ctl - @which bin/sync_diff_inspector @which bin/go-ycsb @which bin/etcdctl @which bin/jq @which bin/minio @which bin/bin/schema-registry-start -integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer oauth2_server +integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer oauth2_server sync-diff-inspector $(FAILPOINT_ENABLE) $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \ -coverpkg=github.com/pingcap/tiflow/... \ @@ -253,7 +253,7 @@ build_mysql_integration_test_images: ## Build MySQL integration test images with build_mysql_integration_test_images: clean_integration_test_containers docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-mysql-integration.yml build --no-cache -integration_test_kafka: check_third_party_binary +integration_test_kafka: check_third_party_binary sync-diff-inspector tests/integration_tests/run.sh kafka "$(CASE)" "$(START_AT)" integration_test_storage: @@ -385,6 +385,9 @@ clean: rm -rf tools/bin rm -rf tools/include +sync-diff-inspector: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/sync_diff_inspector ./sync_diff_inspector + dm: dm-master dm-worker dmctl dm-syncer dm-master: @@ -443,7 +446,7 @@ dm_unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/goc tools/bin/gocov convert "$(DM_TEST_DIR)/cov.unit_test.out" | tools/bin/gocov-xml > dm-coverage.xml $(FAILPOINT_DISABLE) -dm_integration_test_build: check_failpoint_ctl +dm_integration_test_build: check_failpoint_ctl sync-diff-inspector $(FAILPOINT_ENABLE) $(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \ -coverpkg=github.com/pingcap/tiflow/dm/... \ @@ -495,9 +498,8 @@ install_test_python_dep: @echo "install python requirments for test" pip install --user -q -r ./dm/tests/requirements.txt -check_third_party_binary_for_dm: +check_third_party_binary_for_dm : sync-diff-inspector @which bin/tidb-server - @which bin/sync_diff_inspector @which mysql @which bin/minio @@ -554,7 +556,7 @@ tiflow-chaos-case: engine_unit_test: check_failpoint_ctl $(call run_engine_unit_test,$(ENGINE_PACKAGES)) -engine_integration_test: check_third_party_binary_for_engine +engine_integration_test: check_third_party_binary_for_engine sync-diff-inspector mkdir -p /tmp/tiflow_engine_test || true ./engine/test/integration_tests/run.sh "$(CASE)" "$(START_AT)" 2>&1 | tee /tmp/tiflow_engine_test/engine_it.log ./engine/test/utils/check_log.sh @@ -566,7 +568,6 @@ check_third_party_binary_for_engine: @which mysql || (echo "mysql not found in ${PATH}"; exit 1) @which jq || (echo "jq not found in ${PATH}"; exit 1) @which mc || (echo "mc not found in ${PATH}, you can use 'make bin/mc' and move bin/mc to ${PATH}"; exit 1) - @which bin/sync_diff_inspector || (echo "run 'make bin/sync_diff_inspector' to download it if you need") check_engine_integration_test: ./engine/test/utils/check_case.sh @@ -581,9 +582,6 @@ check_cdc_integration_test: bin/mc: ./scripts/download-mc.sh -bin/sync_diff_inspector: - ./scripts/download-sync-diff.sh - define run_engine_unit_test @echo "running unit test for packages:" $(1) mkdir -p $(ENGINE_TEST_DIR) diff --git a/dm/tests/README.md b/dm/tests/README.md index f72fe191fee..bcdf3297be9 100644 --- a/dm/tests/README.md +++ b/dm/tests/README.md @@ -4,7 +4,7 @@ 1. The following executables must be copied or generated or linked into these locations. * `bin/tidb-server` can be downloaded from [tidb-master-linux-amd64](https://download.pingcap.org/tidb-master-linux-amd64.tar.gz) or installed by [tiup](https://github.com/pingcap/tiup), you can use the command `find ~/.tiup -name tidb-server` to locate `tidb-server` binary file and copy it - * `bin/sync_diff_inspector` # can be downloaded from [tidb-enterprise-tools-latest-linux-amd64](http://download.pingcap.org/tidb-enterprise-tools-latest-linux-amd64.tar.gz) or build from [source code](https://github.com/pingcap/tidb-tools) + * `bin/sync_diff_inspector` # generated by `make dm_integration_test_build` * `bin/minio` can be build from (https://github.com/minio/minio) * `bin/dm-master.test` # generated by `make dm_integration_test_build` * `bin/dm-worker.test` # generated by `make dm_integration_test_build` diff --git a/dm/tests/download-compatibility-test-binaries.sh b/dm/tests/download-compatibility-test-binaries.sh index df29fa3e303..e71a740ccda 100755 --- a/dm/tests/download-compatibility-test-binaries.sh +++ b/dm/tests/download-compatibility-test-binaries.sh @@ -77,7 +77,6 @@ main() { # Define download URLs local download_urls=( "${FILE_SERVER_URL}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" - "http://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz" "http://download.pingcap.org/tidb-enterprise-tools-latest-linux-amd64.tar.gz" "${GITHUB_RELEASE_URL}/gh-ost-binary-linux-20200828140552.tar.gz" "${FILE_SERVER_URL}/download/minio.tar.gz" @@ -98,11 +97,6 @@ main() { extract "$filename" "$THIRD_BIN_DIR" "bin/tidb-server" mv "${THIRD_BIN_DIR}/bin/tidb-server" "$THIRD_BIN_DIR/" ;; - tidb-enterprise-tools-nightly-linux-amd64.tar.gz) - extract "$filename" "$THIRD_BIN_DIR" "tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector" - mv "${THIRD_BIN_DIR}/tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector" "$THIRD_BIN_DIR/" - rm -rf "${THIRD_BIN_DIR}/tidb-enterprise-tools-nightly-linux-amd64" - ;; tidb-enterprise-tools-latest-linux-amd64.tar.gz) extract "$filename" "$THIRD_BIN_DIR" "tidb-enterprise-tools-latest-linux-amd64/bin/mydumper" mv "${THIRD_BIN_DIR}/tidb-enterprise-tools-latest-linux-amd64/bin/mydumper" "$THIRD_BIN_DIR/" diff --git a/dm/tests/download-integration-test-binaries.sh b/dm/tests/download-integration-test-binaries.sh index 6dd88b767a4..525f517e5cb 100755 --- a/dm/tests/download-integration-test-binaries.sh +++ b/dm/tests/download-integration-test-binaries.sh @@ -70,14 +70,12 @@ main() { local tidb_sha1=$(get_sha1 "tidb" "$tidb_branch") local tikv_sha1=$(get_sha1 "tikv" "$tikv_branch") local pd_sha1=$(get_sha1 "pd" "$pd_branch") - local tidb_tools_sha1=$(curl "${FILE_SERVER_URL}/download/refs/pingcap/tidb-tools/master/sha1") # Define download URLs local download_urls=( "${FILE_SERVER_URL}/download/builds/pingcap/tidb/${tidb_sha1}/centos7/tidb-server.tar.gz" "${FILE_SERVER_URL}/download/builds/pingcap/tikv/${tikv_sha1}/centos7/tikv-server.tar.gz" "${FILE_SERVER_URL}/download/builds/pingcap/pd/${pd_sha1}/centos7/pd-server.tar.gz" - "${FILE_SERVER_URL}/download/builds/pingcap/tidb-tools/${tidb_tools_sha1}/centos7/tidb-tools.tar.gz" "${GITHUB_RELEASE_URL}/gh-ost-binary-linux-20200828140552.tar.gz" "${FILE_SERVER_URL}/download/minio.tar.gz" ) @@ -105,10 +103,6 @@ main() { tar -xz -C "$THIRD_BIN_DIR" bin/tikv-server -f "${TEMP_DIR}/${filename}" mv "${THIRD_BIN_DIR}/bin/tikv-server" "$THIRD_BIN_DIR/" ;; - tidb-tools.tar.gz) - tar -xz -C "$THIRD_BIN_DIR" 'bin/sync_diff_inspector' -f "${TEMP_DIR}/${filename}" - mv "${THIRD_BIN_DIR}/bin/sync_diff_inspector" "$THIRD_BIN_DIR/" - ;; minio.tar.gz | gh-ost-binary-linux-20200828140552.tar.gz) tar -xz -C "$THIRD_BIN_DIR" -f "${TEMP_DIR}/${filename}" ;; diff --git a/dm/tests/mariadb_master_down_and_up/case.sh b/dm/tests/mariadb_master_down_and_up/case.sh index f3b5a5c5735..b89deca187d 100644 --- a/dm/tests/mariadb_master_down_and_up/case.sh +++ b/dm/tests/mariadb_master_down_and_up/case.sh @@ -107,7 +107,6 @@ function clean_task() { function test_master_down_and_up() { cleanup_process clean_data - install_sync_diff setup_replica gen_full_data run_dm_components_and_create_source $1 diff --git a/dm/tests/mariadb_master_down_and_up/lib.sh b/dm/tests/mariadb_master_down_and_up/lib.sh index 3d38de273e7..4a548c73425 100644 --- a/dm/tests/mariadb_master_down_and_up/lib.sh +++ b/dm/tests/mariadb_master_down_and_up/lib.sh @@ -27,12 +27,6 @@ function exec_tidb() { echo $2 | mysql -uroot -h127.0.0.1 -P$1 } -function install_sync_diff() { - curl https://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz - mkdir -p bin - mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/ -} - function get_master_status() { arr=$(echo "show master status;" | MYSQL_PWD=123456 mysql -uroot -h127.0.0.1 -P3306 | awk 'NR==2') echo $arr diff --git a/dm/tests/tiup/lib.sh b/dm/tests/tiup/lib.sh index 8b57d9355e7..441fd2da753 100755 --- a/dm/tests/tiup/lib.sh +++ b/dm/tests/tiup/lib.sh @@ -56,12 +56,6 @@ function run_sql_tidb_with_retry() { fi } -function install_sync_diff() { - curl https://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz - mkdir -p bin - mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/ -} - function exec_full_stage() { # drop previous data exec_sql mysql1 3306 "DROP DATABASE IF EXISTS $DB1;" diff --git a/dm/tests/tiup/upgrade-from-v1.sh b/dm/tests/tiup/upgrade-from-v1.sh index 3520dd0f7b9..75b4244efb0 100755 --- a/dm/tests/tiup/upgrade-from-v1.sh +++ b/dm/tests/tiup/upgrade-from-v1.sh @@ -122,8 +122,6 @@ function destroy_v2_by_tiup() { } function test() { - install_sync_diff - deploy_v1_by_ansible migrate_in_v1 diff --git a/dm/tests/tiup/upgrade-from-v2.sh b/dm/tests/tiup/upgrade-from-v2.sh index f5781c3002c..1a1252e94b2 100755 --- a/dm/tests/tiup/upgrade-from-v2.sh +++ b/dm/tests/tiup/upgrade-from-v2.sh @@ -170,8 +170,6 @@ function destroy_v2_by_tiup() { } function test() { - install_sync_diff - deploy_previous_v2 migrate_in_previous_v2 diff --git a/dm/tests/tiup/upgrade-tidb.sh b/dm/tests/tiup/upgrade-tidb.sh index 434c74cc7a9..1207e512f27 100755 --- a/dm/tests/tiup/upgrade-tidb.sh +++ b/dm/tests/tiup/upgrade-tidb.sh @@ -52,8 +52,6 @@ function destroy_v2_by_tiup() { # run this before upgrade TiDB. function before_upgrade() { - install_sync_diff - deploy_dm migrate_before_upgrade diff --git a/dm/tests/upstream_switch/case.sh b/dm/tests/upstream_switch/case.sh index 82f04026b22..bae4111e3a5 100644 --- a/dm/tests/upstream_switch/case.sh +++ b/dm/tests/upstream_switch/case.sh @@ -209,7 +209,6 @@ function check_master() { function test_relay() { cleanup_process check_master - install_sync_diff clean_data prepare_binlogs setup_replica diff --git a/dm/tests/upstream_switch/lib.sh b/dm/tests/upstream_switch/lib.sh index 65064fb4cb6..b11537d988f 100644 --- a/dm/tests/upstream_switch/lib.sh +++ b/dm/tests/upstream_switch/lib.sh @@ -30,12 +30,6 @@ function exec_tidb() { echo $2 | mysql -uroot -h$1 -P4000 } -function install_sync_diff() { - curl https://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz - mkdir -p bin - mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/ -} - function prepare_more_binlogs() { exec_sql $1 "create database db1 collate latin1_bin;" exec_sql $1 "flush logs;" diff --git a/scripts/download-integration-test-binaries.sh b/scripts/download-integration-test-binaries.sh index 765d848aede..ec0d8849438 100755 --- a/scripts/download-integration-test-binaries.sh +++ b/scripts/download-integration-test-binaries.sh @@ -91,7 +91,7 @@ download_community_binaries() { mv ${THIRD_BIN_DIR}/tiflash ${THIRD_BIN_DIR}/_tiflash mv ${THIRD_BIN_DIR}/_tiflash/* ${THIRD_BIN_DIR} && rm -rf ${THIRD_BIN_DIR}/_tiflash tar -xz -C ${THIRD_BIN_DIR} pd-ctl -f ${TMP_DIR}/$tidb_file_name/ctl-${dist}.tar.gz - tar -xz -C ${THIRD_BIN_DIR} $toolkit_file_name/etcdctl $toolkit_file_name/sync_diff_inspector -f ${TMP_DIR}/$toolkit_tar_name + tar -xz -C ${THIRD_BIN_DIR} $toolkit_file_name/etcdctl -f ${TMP_DIR}/$toolkit_tar_name mv ${THIRD_BIN_DIR}/$toolkit_file_name/* ${THIRD_BIN_DIR} && rm -rf ${THIRD_BIN_DIR}/$toolkit_file_name # Download additional tools @@ -147,7 +147,6 @@ download_binaries() { local minio_download_url="${FILE_SERVER_URL}/download/minio.tar.gz" local go_ycsb_download_url="${FILE_SERVER_URL}/download/builds/pingcap/go-ycsb/test-br/go-ycsb" local etcd_download_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/etcd-v3.4.7-linux-amd64.tar.gz" - local sync_diff_inspector_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/sync_diff_inspector_hash-a129f096_linux-amd64.tar.gz" local jq_download_url="${FILE_SERVER_URL}/download/builds/pingcap/test/jq-1.6/jq-linux64" local schema_registry_url="${FILE_SERVER_URL}/download/builds/pingcap/cdc/schema-registry.tar.gz" @@ -158,7 +157,6 @@ download_binaries() { download_and_extract "$tiflash_download_url" "tiflash.tar.gz" download_and_extract "$minio_download_url" "minio.tar.gz" download_and_extract "$etcd_download_url" "etcd.tar.gz" "etcd-v3.4.7-linux-amd64/etcdctl" - download_and_extract "$sync_diff_inspector_url" "sync_diff_inspector.tar.gz" download_and_extract "$schema_registry_url" "schema-registry.tar.gz" download_file "$go_ycsb_download_url" "go-ycsb" "${THIRD_BIN_DIR}/go-ycsb" diff --git a/scripts/download-sync-diff.sh b/scripts/download-sync-diff.sh deleted file mode 100755 index 3ee26c6e505..00000000000 --- a/scripts/download-sync-diff.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env bash -# Copyright 2022 PingCAP, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# See the License for the specific language governing permissions and -# limitations under the License. - -set -eu - -echo "will download tidb-tools v6.1.0 to get sync_diff_inspector" -curl -C - --retry 3 -o /tmp/tidb-tools.tar.gz https://download.pingcap.org/tidb-community-toolkit-v6.1.0-linux-amd64.tar.gz -mkdir -p /tmp/tidb-tools -tar -zxf /tmp/tidb-tools.tar.gz -C /tmp/tidb-tools -mv /tmp/tidb-tools/tidb-community-toolkit-v6.1.0-linux-amd64/sync_diff_inspector ./bin/sync_diff_inspector -rm -r /tmp/tidb-tools -rm /tmp/tidb-tools.tar.gz diff --git a/sync_diff_inspector/checkpoints/checkpoints.go b/sync_diff_inspector/checkpoints/checkpoints.go index ab98187eddf..a16cc76c20e 100644 --- a/sync_diff_inspector/checkpoints/checkpoints.go +++ b/sync_diff_inspector/checkpoints/checkpoints.go @@ -211,7 +211,7 @@ func (cp *Checkpoint) Init() { Nodes: make([]*Node, 0), CurrentSavedNode: &Node{ ChunkRange: &chunk.Range{ - Index: chunk.GetInitCID(), + Index: chunk.GetInitChunkID(), IsFirst: true, IsLast: true, }, diff --git a/sync_diff_inspector/chunk/chunk.go b/sync_diff_inspector/chunk/chunk.go index 91ef5f27d7a..5e56391d516 100644 --- a/sync_diff_inspector/chunk/chunk.go +++ b/sync_diff_inspector/chunk/chunk.go @@ -68,8 +68,8 @@ type CID struct { ChunkCnt int `json:"chunk-count"` } -// GetInitCID return an empty CID -func GetInitCID() *CID { +// GetInitChunkID return an empty CID +func GetInitChunkID() *CID { return &CID{ TableIndex: -1, BucketIndexLeft: -1, diff --git a/sync_diff_inspector/diff/diff.go b/sync_diff_inspector/diff/diff.go index 2630897351f..4fa6b8aaf78 100644 --- a/sync_diff_inspector/diff/diff.go +++ b/sync_diff_inspector/diff/diff.go @@ -229,7 +229,7 @@ func (df *Diff) initCheckpoint() error { return nil } -func encodeConfig(config *report.Config) ([]byte, error) { +func encodeReportConfig(config *report.Config) ([]byte, error) { buf := new(bytes.Buffer) if err := toml.NewEncoder(buf).Encode(config); err != nil { return nil, errors.Trace(err) @@ -261,12 +261,12 @@ func getConfigsForReport(cfg *config.Config) ([][]byte, []byte, error) { sourceBytes := make([][]byte, len(sourceConfigs)) var err error for i := range sourceBytes { - sourceBytes[i], err = encodeConfig(sourceConfigs[i]) + sourceBytes[i], err = encodeReportConfig(sourceConfigs[i]) if err != nil { return nil, nil, errors.Trace(err) } } - targetBytes, err := encodeConfig(targetConfig) + targetBytes, err := encodeReportConfig(targetConfig) if err != nil { return nil, nil, errors.Trace(err) } @@ -863,7 +863,7 @@ func (df *Diff) removeSQLFiles(checkPointID *chunk.CID) error { if len(fileIDSubstrs) != 3 { return nil } - tableIndex, bucketIndexLeft, bucketIndexRight, chunkIndex, err := utils.GetCIDFromSQLFileName(fileIDSubstrs[2]) + tableIndex, bucketIndexLeft, bucketIndexRight, chunkIndex, err := utils.GetChunkIDFromSQLFileName(fileIDSubstrs[2]) if err != nil { return errors.Trace(err) } diff --git a/sync_diff_inspector/report/report_test.go b/sync_diff_inspector/report/report_test.go index 6d36ffeb214..c5addaad7da 100644 --- a/sync_diff_inspector/report/report_test.go +++ b/sync_diff_inspector/report/report_test.go @@ -24,10 +24,10 @@ import ( "github.com/BurntSushi/toml" "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/pingcap/tiflow/sync_diff_inspector/chunk" "github.com/pingcap/tiflow/sync_diff_inspector/config" "github.com/pingcap/tiflow/sync_diff_inspector/source/common" + "github.com/pingcap/tiflow/sync_diff_inspector/utils" "github.com/stretchr/testify/require" ) @@ -45,10 +45,10 @@ func TestReport(t *testing.T) { report := NewReport(task) createTableSQL1 := "create table `test`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo1, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo1, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) createTableSQL2 := "create table `atest`.`atbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := utils.GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) tableDiffs := []*common.TableDiff{ @@ -162,7 +162,7 @@ func TestCalculateTotal(t *testing.T) { report := NewReport(task) createTableSQL := "create table `test`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) tableDiffs := []*common.TableDiff{ @@ -209,7 +209,7 @@ func TestCalculateTotal(t *testing.T) { func TestPrint(t *testing.T) { report := NewReport(task) createTableSQL := "create table `test`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) tableDiffs := []*common.TableDiff{ @@ -275,13 +275,13 @@ func TestPrint(t *testing.T) { func TestGetSnapshot(t *testing.T) { report := NewReport(task) createTableSQL1 := "create table `test`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo1, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo1, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) createTableSQL2 := "create table `atest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := utils.GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) createTableSQL3 := "create table `xtest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo3, err := dbutiltest.GetTableInfoBySQL(createTableSQL3, parser.New()) + tableInfo3, err := utils.GetTableInfoBySQL(createTableSQL3, parser.New()) require.NoError(t, err) tableDiffs := []*common.TableDiff{ @@ -392,16 +392,16 @@ func TestCommitSummary(t *testing.T) { outputDir := "./" report := NewReport(&config.TaskConfig{OutputDir: outputDir, FixDir: task.FixDir}) createTableSQL1 := "create table `test`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo1, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo1, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) createTableSQL2 := "create table `atest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := utils.GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) createTableSQL3 := "create table `xtest`.`tbl`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo3, err := dbutiltest.GetTableInfoBySQL(createTableSQL3, parser.New()) + tableInfo3, err := utils.GetTableInfoBySQL(createTableSQL3, parser.New()) require.NoError(t, err) createTableSQL4 := "create table `xtest`.`tb1`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo4, err := dbutiltest.GetTableInfoBySQL(createTableSQL4, parser.New()) + tableInfo4, err := utils.GetTableInfoBySQL(createTableSQL4, parser.New()) require.NoError(t, err) tableDiffs := []*common.TableDiff{ { diff --git a/sync_diff_inspector/source/chunks_iter.go b/sync_diff_inspector/source/chunks_iter.go index 1aad42fc90c..114f6125e57 100644 --- a/sync_diff_inspector/source/chunks_iter.go +++ b/sync_diff_inspector/source/chunks_iter.go @@ -28,9 +28,6 @@ import ( // ChunksIterator is used for single mysql/tidb source. type ChunksIterator struct { - ctx context.Context - cancel context.CancelFunc - ID *chunk.CID tableAnalyzer TableAnalyzer @@ -39,7 +36,8 @@ type ChunksIterator struct { errCh chan error splitThreadCount int - pool *utils.WorkerPool + cancel context.CancelFunc + pool *utils.WorkerPool } // NewChunksIterator returns a new iterator @@ -52,9 +50,6 @@ func NewChunksIterator( ) (*ChunksIterator, error) { ctxx, cancel := context.WithCancel(ctx) iter := &ChunksIterator{ - ctx: ctxx, - cancel: cancel, - splitThreadCount: splitThreadCount, tableAnalyzer: analyzer, TableDiffs: tableDiffs, @@ -62,18 +57,15 @@ func NewChunksIterator( // reserve 30 capacity for each goroutine on average chunksCh: make(chan *splitter.RangeInfo, 30*splitThreadCount), errCh: make(chan error, len(tableDiffs)), + cancel: cancel, pool: utils.NewWorkerPool(uint(splitThreadCount), "chunks producer"), } - go iter.produceChunks(startRange) + go iter.produceChunks(ctxx, startRange) return iter, nil } -func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { - defer func() { - t.pool.WaitFinished() - close(t.chunksCh) - }() - +func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter.RangeInfo) { + defer close(t.chunksCh) nextTableIndex := 0 // If chunkRange @@ -81,11 +73,10 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { curIndex := startRange.GetTableIndex() curTable := t.TableDiffs[curIndex] nextTableIndex = curIndex + 1 - // if this chunk is empty, data-check for this table should be skipped if startRange.ChunkRange.Type != chunk.Empty { t.pool.Apply(func() { - chunkIter, err := t.tableAnalyzer.AnalyzeSplitter(t.ctx, curTable, startRange) + chunkIter, err := t.tableAnalyzer.AnalyzeSplitter(ctx, curTable, startRange) if err != nil { t.errCh <- errors.Trace(err) return @@ -102,7 +93,7 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { } c.Index.TableIndex = curIndex select { - case <-t.ctx.Done(): + case <-ctx.Done(): log.Info("Stop do produce chunks by context done") return case t.chunksCh <- &splitter.RangeInfo{ @@ -125,7 +116,7 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { progressID := dbutil.TableName(table.Schema, table.Table) progress.StartTable(progressID, 1, true) select { - case <-t.ctx.Done(): + case <-ctx.Done(): log.Info("Stop do produce chunks by context done") return case t.chunksCh <- &splitter.RangeInfo{ @@ -146,7 +137,7 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { t.pool.Apply(func() { table := t.TableDiffs[curTableIndex] - chunkIter, err := t.tableAnalyzer.AnalyzeSplitter(t.ctx, table, nil) + chunkIter, err := t.tableAnalyzer.AnalyzeSplitter(ctx, table, nil) if err != nil { t.errCh <- errors.Trace(err) return @@ -163,7 +154,7 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { } c.Index.TableIndex = curTableIndex select { - case <-t.ctx.Done(): + case <-ctx.Done(): log.Info("Stop do produce chunks by context done") return case t.chunksCh <- &splitter.RangeInfo{ @@ -175,6 +166,7 @@ func (t *ChunksIterator) produceChunks(startRange *splitter.RangeInfo) { } }) } + t.pool.WaitFinished() } // Next returns the next chunk diff --git a/sync_diff_inspector/source/common/common_test.go b/sync_diff_inspector/source/common/common_test.go index 5649bbf347d..6c83caee099 100644 --- a/sync_diff_inspector/source/common/common_test.go +++ b/sync_diff_inspector/source/common/common_test.go @@ -19,14 +19,13 @@ import ( "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/util/dbutil" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/pingcap/tiflow/sync_diff_inspector/utils" "github.com/stretchr/testify/require" ) func TestRowData(t *testing.T) { createTableSQL := "create table test.test(id int(24), name varchar(24), age int(24), primary key(id, name));" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) _, orderKeyCols := dbutil.SelectUniqueOrderKey(tableInfo) diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index bc69832547d..d55359bf95b 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -22,12 +22,12 @@ import ( "regexp" "strconv" "testing" + "time" "github.com/DATA-DOG/go-sqlmock" _ "github.com/go-sql-driver/mysql" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/util/dbutil" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" filter "github.com/pingcap/tidb/pkg/util/table-filter" router "github.com/pingcap/tidb/pkg/util/table-router" "github.com/pingcap/tiflow/sync_diff_inspector/chunk" @@ -293,7 +293,7 @@ func TestFallbackToRandomIfRangeIsSet(t *testing.T) { "`c` char(120) NOT NULL DEFAULT '', " + "PRIMARY KEY (`id`), KEY `k_1` (`k`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) table1 := &common.TableDiff{ @@ -518,6 +518,9 @@ func TestMysqlRouter(t *testing.T) { require.NoError(t, err) rangeIter.Close() + // Wait goroutine quits to avoid data race + time.Sleep(time.Second) + // row Iterator dataRows := sqlmock.NewRows(tableCases[0].rowColumns) for k := 0; k < 2; k++ { @@ -625,7 +628,7 @@ func TestTiDBRouter(t *testing.T) { func prepareTiDBTables(t *testing.T, tableCases []*tableCaseType) []*common.TableDiff { tableDiffs := make([]*common.TableDiff, 0, len(tableCases)) for n, tableCase := range tableCases { - tableInfo, err := dbutiltest.GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) require.NoError(t, err) tableDiffs = append(tableDiffs, &common.TableDiff{ Schema: "source_test", diff --git a/sync_diff_inspector/splitter/bucket.go b/sync_diff_inspector/splitter/bucket.go index 37fd765729b..96e6c310da1 100644 --- a/sync_diff_inspector/splitter/bucket.go +++ b/sync_diff_inspector/splitter/bucket.go @@ -16,6 +16,7 @@ package splitter import ( "context" "database/sql" + "sync" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -34,14 +35,12 @@ const DefaultChannelBuffer = 1024 // BucketIterator is struct for bucket iterator type BucketIterator struct { - ctx context.Context - cancel context.CancelFunc - buckets []dbutil.Bucket table *common.TableDiff indexColumns []*model.ColumnInfo chunkPool *utils.WorkerPool + wg sync.WaitGroup // control for one bucket in shared chunkPool chunkSize int64 chunks []*chunk.Range @@ -49,6 +48,7 @@ type BucketIterator struct { chunksCh chan []*chunk.Range errCh chan error + cancel context.CancelFunc indexID int64 progressID string @@ -75,16 +75,14 @@ func NewBucketIteratorWithCheckpoint( table.Range) } - ctx, cancel := context.WithCancel(ctx) + bctx, cancel := context.WithCancel(ctx) bs := &BucketIterator{ - ctx: ctx, - cancel: cancel, - table: table, chunkPool: bucketSpliterPool, chunkSize: table.ChunkSize, chunksCh: make(chan []*chunk.Range, DefaultChannelBuffer), errCh: make(chan error, 1), + cancel: cancel, dbConn: dbConn, progressID: progressID, @@ -96,7 +94,7 @@ func NewBucketIteratorWithCheckpoint( // Let the progress bar begins to record the table. progress.StartTable(bs.progressID, 0, false) - go bs.produceChunks(startRange) + go bs.produceChunks(bctx, startRange) return bs, nil } @@ -230,19 +228,21 @@ NEXTINDEX: // Close closes the iterator func (s *BucketIterator) Close() { s.cancel() - s.chunkPool.WaitFinished() } func (s *BucketIterator) splitChunkForBucket( + ctx context.Context, firstBucketID, lastBucketID, beginIndex int, bucketChunkCnt, splitChunkCnt int, chunkRange *chunk.Range, ) { + s.wg.Add(1) s.chunkPool.Apply(func() { - chunks, err := splitRangeByRandom(s.ctx, s.dbConn, chunkRange, splitChunkCnt, s.table.Schema, s.table.Table, s.indexColumns, s.table.Range, s.table.Collation) + defer s.wg.Done() + chunks, err := splitRangeByRandom(ctx, s.dbConn, chunkRange, splitChunkCnt, s.table.Schema, s.table.Table, s.indexColumns, s.table.Range, s.table.Collation) if err != nil { select { - case <-s.ctx.Done(): + case <-ctx.Done(): case s.errCh <- errors.Trace(err): } return @@ -253,11 +253,11 @@ func (s *BucketIterator) splitChunkForBucket( }) } -func (s *BucketIterator) produceChunks(startRange *RangeInfo) { +func (s *BucketIterator) produceChunks(ctx context.Context, startRange *RangeInfo) { defer func() { - s.chunkPool.WaitFinished() - close(s.chunksCh) + s.wg.Wait() progress.UpdateTotal(s.progressID, 0, true) + close(s.chunksCh) }() var ( lowerValues, upperValues []string @@ -277,7 +277,7 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { // its bucketID is less than len(s.buckets) if c.Index.BucketIndexRight >= len(s.buckets) { select { - case <-s.ctx.Done(): + case <-ctx.Done(): case s.errCh <- errors.New("Wrong Bucket: Bucket index of the checkpoint node is larger than buckets' size"): } return @@ -286,7 +286,7 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { nextUpperValues, err := dbutil.AnalyzeValuesFromBuckets(s.buckets[c.Index.BucketIndexRight].UpperBound, s.indexColumns) if err != nil { select { - case <-s.ctx.Done(): + case <-ctx.Done(): case s.errCh <- errors.Trace(err): } return @@ -306,7 +306,7 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { chunkRange.Update(bound.Column, bound.Upper, "", true, false) } - s.splitChunkForBucket(c.Index.BucketIndexLeft, c.Index.BucketIndexRight, c.Index.ChunkIndex+1, c.Index.ChunkCnt, leftCnt, chunkRange) + s.splitChunkForBucket(ctx, c.Index.BucketIndexLeft, c.Index.BucketIndexRight, c.Index.ChunkIndex+1, c.Index.ChunkCnt, leftCnt, chunkRange) } } halfChunkSize := s.chunkSize >> 1 @@ -322,7 +322,7 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { upperValues, err = dbutil.AnalyzeValuesFromBuckets(s.buckets[i].UpperBound, s.indexColumns) if err != nil { select { - case <-s.ctx.Done(): + case <-ctx.Done(): case s.errCh <- errors.Trace(err): } return @@ -348,10 +348,10 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { if i == firstBucket { // chunkCnt := int((count + halfChunkSize) / s.chunkSize) - s.splitChunkForBucket(firstBucket, i, 0, chunkCnt, chunkCnt, chunkRange) + s.splitChunkForBucket(ctx, firstBucket, i, 0, chunkCnt, chunkCnt, chunkRange) } else { // use multi-buckets so chunkCnt = 1 - s.splitChunkForBucket(firstBucket, i, 0, 1, 1, chunkRange) + s.splitChunkForBucket(ctx, firstBucket, i, 0, 1, 1, chunkRange) } latestCount = s.buckets[i].Count @@ -373,5 +373,5 @@ func (s *BucketIterator) produceChunks(startRange *RangeInfo) { } // When the table is much less than chunkSize, // it will return a chunk include the whole table. - s.splitChunkForBucket(firstBucket, len(s.buckets), 0, 1, 1, chunkRange) + s.splitChunkForBucket(ctx, firstBucket, len(s.buckets), 0, 1, 1, chunkRange) } diff --git a/sync_diff_inspector/splitter/index_fields_test.go b/sync_diff_inspector/splitter/index_fields_test.go index 788df79a116..c1c2a724f2b 100644 --- a/sync_diff_inspector/splitter/index_fields_test.go +++ b/sync_diff_inspector/splitter/index_fields_test.go @@ -17,7 +17,7 @@ import ( "testing" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" + "github.com/pingcap/tiflow/sync_diff_inspector/utils" "github.com/stretchr/testify/require" ) @@ -30,7 +30,7 @@ func TestIndexFieldsSimple(t *testing.T) { "`c` char(120) NOT NULL DEFAULT '', " + "PRIMARY KEY (`id`), KEY `k_1` (`k`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) fields, err := indexFieldsFromConfigString("k", tableInfo) @@ -61,7 +61,7 @@ func TestIndexFieldsComposite(t *testing.T) { "KEY `k_1` (`k`)," + "UNIQUE INDEX `c_1` (`c`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) fields, err := indexFieldsFromConfigString("id, k", tableInfo) @@ -92,7 +92,7 @@ func TestIndexFieldsEmpty(t *testing.T) { "`c` char(120) NOT NULL DEFAULT '', " + "PRIMARY KEY (`id`), KEY `k_1` (`k`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) fields, err := indexFieldsFromConfigString("", tableInfo) diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index 1326f3c0f52..b301ba0e455 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -23,7 +23,6 @@ import ( sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/tidb/pkg/parser" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/pingcap/tiflow/sync_diff_inspector/chunk" "github.com/pingcap/tiflow/sync_diff_inspector/source/common" "github.com/pingcap/tiflow/sync_diff_inspector/utils" @@ -142,7 +141,7 @@ func TestSplitRangeByRandom(t *testing.T) { } for _, testCase := range testCases { - tableInfo, err := dbutiltest.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) require.NoError(t, err) splitCols, err := GetSplitFields(tableInfo, nil) @@ -322,7 +321,7 @@ func TestRandomSpliter(t *testing.T) { } for _, testCase := range testCases { - tableInfo, err := dbutiltest.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(testCase.createTableSQL, parser.New()) require.NoError(t, err) info, needUnifiedTimeStamp := utils.ResetColumns(tableInfo, testCase.IgnoreColumns) @@ -357,7 +356,7 @@ func TestRandomSpliter(t *testing.T) { // Test Checkpoint stopJ := 3 - tableInfo, err := dbutiltest.GetTableInfoBySQL(testCases[0].createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(testCases[0].createTableSQL, parser.New()) require.NoError(t, err) tableDiff := &common.TableDiff{ @@ -431,7 +430,7 @@ func TestBucketSpliter(t *testing.T) { require.NoError(t, err) createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) testCases := []struct { @@ -743,7 +742,7 @@ func TestLimitSpliter(t *testing.T) { ctx := context.Background() createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) testCases := []struct { @@ -884,7 +883,7 @@ func TestChunkSize(t *testing.T) { require.NoError(t, err) createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) tableDiff := &common.TableDiff{ @@ -925,7 +924,7 @@ func TestChunkSize(t *testing.T) { require.Equal(t, randomIter.chunkSize, int64(100000)) createTableSQL = "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime)" - tableInfo, err = dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err = utils.GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) tableDiffNoIndex := &common.TableDiff{ diff --git a/sync_diff_inspector/utils/table.go b/sync_diff_inspector/utils/table.go index 4b602bb02fb..c4709d697d6 100644 --- a/sync_diff_inspector/utils/table.go +++ b/sync_diff_inspector/utils/table.go @@ -29,10 +29,10 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + _ "github.com/pingcap/tidb/pkg/planner/core" // to setup expression.EvalSimpleAst for in core_init "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/dbutil" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/pingcap/tidb/pkg/util/mock" ) @@ -191,5 +191,47 @@ func GetTableInfo( if err != nil { return nil, errors.Trace(err) } - return dbutiltest.GetTableInfoBySQL(createTableSQL, parser2) + return GetTableInfoBySQL(createTableSQL, parser2) +} + +// GetTableInfoBySQL gets the table info from SQL. +// Here we didn't use dbutiltest.GetTableInfoBySQL because it use buildTableInfoWithCheck internally, +// and the check itself may cause errors in some integration tests. +// See https://github.com/pingcap/tidb-tools/blob/37c2dad9218826a114e3389ac1209367715383ea/pkg/dbutil/table.go#L156-L162 +func GetTableInfoBySQL(createTableSQL string, parser2 *parser.Parser) (table *model.TableInfo, err error) { + stmt, err := parser2.ParseOneStmt(createTableSQL, "", "") + if err != nil { + return nil, errors.Trace(err) + } + + s, ok := stmt.(*ast.CreateTableStmt) + if ok { + table, err := ddl.BuildTableInfoWithStmt(metabuild.NewNonStrictContext(), s, mysql.DefaultCharset, "", nil) + if err != nil { + return nil, errors.Trace(err) + } + + // put primary key in indices + if table.PKIsHandle { + pkIndex := &model.IndexInfo{ + Name: pmodel.NewCIStr("PRIMARY"), + Primary: true, + State: model.StatePublic, + Unique: true, + Tp: pmodel.IndexTypeBtree, + Columns: []*model.IndexColumn{ + { + Name: table.GetPkName(), + Length: types.UnspecifiedLength, + }, + }, + } + + table.Indices = append(table.Indices, pkIndex) + } + + return table, nil + } + + return nil, errors.Errorf("get table info from sql %s failed", createTableSQL) } diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 354ae30eeb9..dca2ec08329 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -1022,8 +1022,8 @@ func GetSQLFileName(index *chunk.CID) string { return fmt.Sprintf("%d:%d-%d:%d", index.TableIndex, index.BucketIndexLeft, index.BucketIndexRight, index.ChunkIndex) } -// GetCIDFromSQLFileName convert the filename to chunk's `Index`. -func GetCIDFromSQLFileName(fileIDStr string) (int, int, int, int, error) { +// GetChunkIDFromSQLFileName convert the filename to chunk's `Index`. +func GetChunkIDFromSQLFileName(fileIDStr string) (int, int, int, int, error) { ids := strings.Split(fileIDStr, ":") tableIndex, err := strconv.Atoi(ids[0]) if err != nil { diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index d5bd5f8eadc..fc1e82a8789 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/util/dbutil" - "github.com/pingcap/tidb/pkg/util/dbutil/dbutiltest" "github.com/pingcap/tiflow/sync_diff_inspector/chunk" "github.com/stretchr/testify/require" ) @@ -80,7 +79,7 @@ func TestStringsToInterface(t *testing.T) { func TestBasicTableUtilOperation(t *testing.T) { createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) query, orderKeyCols := GetTableRowsQueryFormat("test", "test", tableInfo, "123") @@ -242,7 +241,7 @@ func TestBasicTableUtilOperation(t *testing.T) { // Test ignore columns createTableSQL = "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime, `e` timestamp, primary key(`a`, `b`), key(`c`, `d`))" - tableInfo, err = dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err = GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) require.Equal(t, len(tableInfo.Indices), 2) @@ -266,7 +265,7 @@ func TestGetCountAndMD5Checksum(t *testing.T) { defer conn.Close() createTableSQL := "create table `test`.`test`(`a` int, `c` float, `b` varchar(10), `d` datetime, primary key(`a`, `b`), key(`c`, `d`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) @@ -286,7 +285,7 @@ func TestGetApproximateMid(t *testing.T) { defer conn.Close() createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), primary key(`a`, `b`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) rows := sqlmock.NewRows([]string{"a", "b"}).AddRow("5", "10") @@ -308,7 +307,7 @@ func TestGetApproximateMid(t *testing.T) { func TestGenerateSQLs(t *testing.T) { createTableSQL := "CREATE TABLE `diff_test`.`atest` (`id` int(24), `name` varchar(24), `birthday` datetime, `update_time` time, `money` decimal(20,2), `id_gen` int(11) GENERATED ALWAYS AS ((`id` + 1)) VIRTUAL, primary key(`id`, `name`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) rowsData := map[string]*dbutil.ColumnData{ @@ -327,7 +326,7 @@ func TestGenerateSQLs(t *testing.T) { // test the unique key createTableSQL2 := "CREATE TABLE `diff_test`.`atest` (`id` int(24), `name` varchar(24), `birthday` datetime, `update_time` time, `money` decimal(20,2), unique key(`id`, `name`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) replaceSQL = GenerateReplaceDML(rowsData, tableInfo2, "diff_test") deleteSQL = GenerateDeleteDML(rowsData, tableInfo2, "diff_test") @@ -357,7 +356,7 @@ func TestGenerateSQLs(t *testing.T) { func TestResetColumns(t *testing.T) { createTableSQL1 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`))" - tableInfo1, err := dbutiltest.GetTableInfoBySQL(createTableSQL1, parser.New()) + tableInfo1, err := GetTableInfoBySQL(createTableSQL1, parser.New()) require.NoError(t, err) tbInfo, hasTimeStampType := ResetColumns(tableInfo1, []string{"a"}) require.Equal(t, len(tbInfo.Columns), 3) @@ -366,14 +365,14 @@ func TestResetColumns(t *testing.T) { require.False(t, hasTimeStampType) createTableSQL2 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) tbInfo, _ = ResetColumns(tableInfo2, []string{"a", "b"}) require.Equal(t, len(tbInfo.Columns), 2) require.Equal(t, len(tbInfo.Indices), 0) createTableSQL3 := "CREATE TABLE `test`.`atest` (`a` int, `b` int, `c` int, `d` int, primary key(`a`), index idx(`b`, `c`))" - tableInfo3, err := dbutiltest.GetTableInfoBySQL(createTableSQL3, parser.New()) + tableInfo3, err := GetTableInfoBySQL(createTableSQL3, parser.New()) require.NoError(t, err) tbInfo, _ = ResetColumns(tableInfo3, []string{"b", "c"}) require.Equal(t, len(tbInfo.Columns), 2) @@ -454,7 +453,7 @@ func TestGetBetterIndex(t *testing.T) { }, } tableCase := tableCases[0] - tableInfo, err := dbutiltest.GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) require.NoError(t, err) indices := dbutil.FindAllIndex(tableInfo) for i, index := range indices { @@ -473,7 +472,7 @@ func TestGetBetterIndex(t *testing.T) { require.Equal(t, indices[0].Name.O, tableCase.selected) tableCase = tableCases[1] - tableInfo, err = dbutiltest.GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) + tableInfo, err = GetTableInfoBySQL(tableCase.createTableSQL, parser.New()) require.NoError(t, err) indices = dbutil.FindAllIndex(tableInfo) for i, index := range indices { @@ -510,8 +509,8 @@ func TestGetSQLFileName(t *testing.T) { require.Equal(t, GetSQLFileName(index), "1:2-3:4") } -func TestGetCIDFromSQLFileName(t *testing.T) { - tableIndex, bucketIndexLeft, bucketIndexRight, chunkIndex, err := GetCIDFromSQLFileName("11:12-13:14") +func TestGetChunkIDFromSQLFileName(t *testing.T) { + tableIndex, bucketIndexLeft, bucketIndexRight, chunkIndex, err := GetChunkIDFromSQLFileName("11:12-13:14") require.NoError(t, err) require.Equal(t, tableIndex, 11) require.Equal(t, bucketIndexLeft, 12) @@ -521,7 +520,7 @@ func TestGetCIDFromSQLFileName(t *testing.T) { func TestCompareStruct(t *testing.T) { createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) var isEqual bool @@ -532,7 +531,7 @@ func TestCompareStruct(t *testing.T) { // column length different createTableSQL2 := "create table `test`(`a` int, `b` varchar(10), `c` float, primary key(`a`, `b`), index(`c`))" - tableInfo2, err := dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err := GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -541,7 +540,7 @@ func TestCompareStruct(t *testing.T) { // column name differernt createTableSQL2 = "create table `test`(`aa` int, `b` varchar(10), `c` float, `d` datetime, primary key(`aa`, `b`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -550,7 +549,7 @@ func TestCompareStruct(t *testing.T) { // column type compatible createTableSQL2 = "create table `test`(`a` int, `b` char(10), `c` float, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -558,7 +557,7 @@ func TestCompareStruct(t *testing.T) { require.False(t, isPanic) createTableSQL2 = "create table `test`(`a` int(11), `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -567,7 +566,7 @@ func TestCompareStruct(t *testing.T) { // column type not compatible createTableSQL2 = "create table `test`(`a` int, `b` varchar(10), `c` int, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -576,7 +575,7 @@ func TestCompareStruct(t *testing.T) { // column properties not compatible createTableSQL2 = "create table `test`(`a` int, `b` varchar(11), `c` int, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -587,7 +586,7 @@ func TestCompareStruct(t *testing.T) { // index different createTableSQL2 = "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -598,11 +597,11 @@ func TestCompareStruct(t *testing.T) { // index column different createTableSQL = "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `b`), index(`c`))" - tableInfo, err = dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err = GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) createTableSQL2 = "create table `test`.`test`(`a` int, `b` varchar(10), `c` float, `d` datetime, primary key(`a`, `c`), index(`c`))" - tableInfo2, err = dbutiltest.GetTableInfoBySQL(createTableSQL2, parser.New()) + tableInfo2, err = GetTableInfoBySQL(createTableSQL2, parser.New()) require.NoError(t, err) isEqual, isPanic = CompareStruct([]*model.TableInfo{tableInfo, tableInfo2}, tableInfo) @@ -628,7 +627,7 @@ func TestGenerateSQLBlob(t *testing.T) { } for _, c := range cases { - tableInfo, err := dbutiltest.GetTableInfoBySQL(c.createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(c.createTableSQL, parser.New()) require.NoError(t, err) replaceSQL := GenerateReplaceDML(rowsData, tableInfo, "diff_test") @@ -640,7 +639,7 @@ func TestGenerateSQLBlob(t *testing.T) { func TestCompareBlob(t *testing.T) { createTableSQL := "create table `test`.`test`(`a` int primary key, `b` blob)" - tableInfo, err := dbutiltest.GetTableInfoBySQL(createTableSQL, parser.New()) + tableInfo, err := GetTableInfoBySQL(createTableSQL, parser.New()) require.NoError(t, err) _, orderKeyCols := GetTableRowsQueryFormat("test", "test", tableInfo, "123") @@ -683,3 +682,15 @@ func TestCompareBlob(t *testing.T) { } } } + +func TestSQLWithInvalidOptions(t *testing.T) { + // Test parse SQL with invalid default value + tblInfo, err := GetTableInfoBySQL("CREATE TABLE `t4` (`create_by` datetime NOT NULL DEFAULT '0000-00-00 00:00:00')", parser.New()) + require.NoError(t, err) + require.Equal(t, tblInfo.Columns[0].DefaultValue.(string), "0000-00-00 00:00:00") + + // Test parse SQL with other charset + tblInfo, err = GetTableInfoBySQL("create table t1 (id int, name varchar(20), primary key(`id`)) character set gbk", parser.New()) + require.NoError(t, err) + require.Equal(t, tblInfo.Charset, "gbk") +} diff --git a/tests/integration_tests/README.md b/tests/integration_tests/README.md index 483f697338f..b2c3d471f85 100644 --- a/tests/integration_tests/README.md +++ b/tests/integration_tests/README.md @@ -14,7 +14,6 @@ If you need to specify a version, os or arch, you can use, for example: `make pr * `pd-ctl` # version >= 6.0.0-rc.1 * `tiflash` # tiflash binary * `libc++.so, libc++abi.so, libgmssl.so, libtiflash_proxy.so` # some necessary so files related to tiflash - * `sync_diff_inspector` * [go-ycsb](https://github.com/pingcap/go-ycsb) * [etcdctl](https://github.com/etcd-io/etcd/tree/master/etcdctl) * [jq](https://stedolan.github.io/jq/)