From d7a6233c6c5450b8144af144eb5b4ce8ce404264 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 13:28:55 +0800 Subject: [PATCH 1/6] try fix --- cdc/model/sink.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index cc33d2b1008..d1286adb9de 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1092,6 +1092,10 @@ func (d *DDLEvent) FromJobWithArgs( case model.ActionDropView: d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.TableName.Schema, d.TableInfo.TableName.Table) + case model.ActionRenameTable: + d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", + preTableInfo.TableName.Schema, preTableInfo.TableName.Table, + tableInfo.TableName.Schema, tableInfo.TableName.Table) case model.ActionRenameTables: oldTableName := preTableInfo.Name.O newTableName := tableInfo.Name.O From a4c94fc9c38ebba59c30b149bba7dfe7aa760ba8 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 14:39:18 +0800 Subject: [PATCH 2/6] add test --- tests/integration_tests/ddl_manager/data/prepare.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/ddl_manager/data/prepare.sql b/tests/integration_tests/ddl_manager/data/prepare.sql index 444106d9dff..69443c69faa 100644 --- a/tests/integration_tests/ddl_manager/data/prepare.sql +++ b/tests/integration_tests/ddl_manager/data/prepare.sql @@ -113,7 +113,7 @@ CREATE TABLE t1 ( col0 INT NOT NULL ); -RENAME TABLE `cross_db_1`.`t1` TO `cross_db_2`.`t1`; +RENAME TABLE `t1` TO `cross_db_2`.`t1`; CREATE TABLE `cross_db_1`.`t2` like `cross_db_2`.`t1`; From 7909a4ad68a4a8e0df57c67b1847974b5efe8de3 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 14:54:26 +0800 Subject: [PATCH 3/6] add some log --- cdc/entry/schema/snapshot.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index a63608b2770..b74dded5791 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -134,6 +134,7 @@ func NewSnapshotFromMeta( // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) for _, dbinfo := range dbinfos { + log.Info("load database", zap.Int64("dbID", dbinfo.ID), zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) continue @@ -178,6 +179,7 @@ func NewSnapshotFromMeta( } for _, tableInfo := range tableInfos { + log.Info("load table", zap.Int64("tableID", tableInfo.ID), zap.Stringer("name", tableInfo.Name), zap.Stringer("changefeed", id)) tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) tableCount++ snap.inner.tables.ReplaceOrInsert(versionedID{ From 9626b531ec5f301d682f303fad1691fdd43d26ee Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 15:01:19 +0800 Subject: [PATCH 4/6] add some log --- cdc/entry/schema/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index b74dded5791..6cde787dd25 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -134,7 +134,7 @@ func NewSnapshotFromMeta( // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) for _, dbinfo := range dbinfos { - log.Info("load database", zap.Int64("dbID", dbinfo.ID), zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) + log.Info("load database", zap.Uint64("currentTs", currentTs), zap.Int64("dbID", dbinfo.ID), zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) continue From deee90da1f18cab0ab67805f944b584b95c31a66 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 17:02:54 +0800 Subject: [PATCH 5/6] try fix --- cdc/model/sink.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d1286adb9de..30b8118e9b0 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1093,8 +1093,11 @@ func (d *DDLEvent) FromJobWithArgs( d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.TableName.Schema, d.TableInfo.TableName.Table) case model.ActionRenameTable: + // Note: preTableInfo may not be accurate for rename table + // because if rename table ddl' finished ts is 100, + // TODO: add more explanation d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", - preTableInfo.TableName.Schema, preTableInfo.TableName.Table, + job.InvolvingSchemaInfo[0].Database, job.InvolvingSchemaInfo[0].Table, tableInfo.TableName.Schema, tableInfo.TableName.Table) case model.ActionRenameTables: oldTableName := preTableInfo.Name.O From c99eece8b53504f9a0cb8214e3c6a1bae4486a46 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 2 Jan 2025 17:22:58 +0800 Subject: [PATCH 6/6] add more comment --- cdc/entry/schema/snapshot.go | 2 -- cdc/model/sink.go | 8 +++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 6cde787dd25..a63608b2770 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -134,7 +134,6 @@ func NewSnapshotFromMeta( // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) for _, dbinfo := range dbinfos { - log.Info("load database", zap.Uint64("currentTs", currentTs), zap.Int64("dbID", dbinfo.ID), zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.Stringer("db", dbinfo.Name), zap.Stringer("changefeed", id)) continue @@ -179,7 +178,6 @@ func NewSnapshotFromMeta( } for _, tableInfo := range tableInfos { - log.Info("load table", zap.Int64("tableID", tableInfo.ID), zap.Stringer("name", tableInfo.Name), zap.Stringer("changefeed", id)) tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) tableCount++ snap.inner.tables.ReplaceOrInsert(versionedID{ diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 30b8118e9b0..8191465b32b 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1093,9 +1093,11 @@ func (d *DDLEvent) FromJobWithArgs( d.Query = fmt.Sprintf("DROP VIEW `%s`.`%s`", d.TableInfo.TableName.Schema, d.TableInfo.TableName.Table) case model.ActionRenameTable: - // Note: preTableInfo may not be accurate for rename table - // because if rename table ddl' finished ts is 100, - // TODO: add more explanation + // Note: preTableInfo may not be accurate for rename table. + // after pr: https://github.com/pingcap/tidb/pull/43341, + // assume there is a table `test.t` and a ddl: `rename table t to test2.t;`, and its commit ts is `100`. + // if you get a ddl snapshot at ts `99`, table `t` is already in `test2`. + // so preTableInfo.TableName.Schema will also be `test2`. d.Query = fmt.Sprintf("RENAME TABLE `%s`.`%s` TO `%s`.`%s`", job.InvolvingSchemaInfo[0].Database, job.InvolvingSchemaInfo[0].Table, tableInfo.TableName.Schema, tableInfo.TableName.Table)