diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index a63608b2770..b188ffcf5a7 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -109,6 +109,24 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error { return nil } +// GetSchemaVersion returns the schema version of the meta. +func GetSchemaVersion(meta timeta.Reader) (int64, error) { + // After we get the schema version at startTs, if the diff corresponding to that version does not exist, + // it means that the job is not committed yet, so we should subtract one from the version, i.e., version--. + version, err := meta.GetSchemaVersion() + if err != nil { + return 0, errors.Trace(err) + } + diff, err := meta.GetSchemaDiff(version) + if err != nil { + return 0, errors.Trace(err) + } + if diff == nil { + version-- + } + return version, nil +} + // NewSnapshotFromMeta creates a schema snapshot from meta. func NewSnapshotFromMeta( id model.ChangeFeedID, diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 784a1affe00..9feaf16128d 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -73,8 +73,9 @@ type schemaStorage struct { snaps []*schema.Snapshot snapsMu sync.RWMutex - gcTs uint64 - resolvedTs uint64 + gcTs uint64 + resolvedTs uint64 + schemaVersion int64 filter filter.Filter @@ -91,8 +92,9 @@ func NewSchemaStorage( role util.Role, filter filter.Filter, ) (SchemaStorage, error) { var ( - snap *schema.Snapshot - err error + snap *schema.Snapshot + version int64 + err error ) // storage may be nil in some unit test cases. if storage == nil { @@ -103,6 +105,7 @@ func NewSchemaStorage( if err != nil { return nil, errors.Trace(err) } + version, err = schema.GetSchemaVersion(meta) if err != nil { return nil, errors.Trace(err) } @@ -113,6 +116,7 @@ func NewSchemaStorage( forceReplicate: forceReplicate, filter: filter, id: id, + schemaVersion: version, role: role, }, nil } @@ -190,6 +194,7 @@ func (s *schemaStorage) GetLastSnapshot() *schema.Snapshot { // HandleDDLJob creates a new snapshot in storage and handles the ddl job func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { if s.skipJob(job) { + s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) return nil } @@ -198,8 +203,9 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { var snap *schema.Snapshot if len(s.snaps) > 0 { lastSnap := s.snaps[len(s.snaps)-1] - // already-executed DDL could filted by finishedTs. - if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() { + // We use schemaVersion to check if an already-executed DDL job is processed for a second time. + // Unexecuted DDL jobs should have largest schemaVersions. + if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion { log.Info("schemaStorage: ignore foregone DDL", zap.String("namespace", s.id.Namespace), zap.String("changefeed", s.id.ID), @@ -207,6 +213,7 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { zap.String("state", job.State.String()), zap.Int64("jobID", job.ID), zap.Uint64("finishTs", job.BinlogInfo.FinishedTS), + zap.Int64("schemaVersion", s.schemaVersion), zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion), zap.String("role", s.role.String())) return nil @@ -228,6 +235,7 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { return errors.Trace(err) } s.snaps = append(s.snaps, snap) + s.schemaVersion = job.BinlogInfo.SchemaVersion s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS) log.Info("schemaStorage: update snapshot by the DDL job", zap.String("namespace", s.id.Namespace), @@ -236,6 +244,7 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error { zap.String("table", job.TableName), zap.String("query", job.Query), zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS), + zap.Uint64("schemaVersion", uint64(s.schemaVersion)), zap.String("role", s.role.String())) return nil } diff --git a/tests/integration_tests/ddl_with_exists/run.sh b/tests/integration_tests/ddl_with_exists/run.sh index b7810ac2e8f..8115ce4ed1a 100755 --- a/tests/integration_tests/ddl_with_exists/run.sh +++ b/tests/integration_tests/ddl_with_exists/run.sh @@ -52,6 +52,6 @@ function run() { } trap stop_tidb_cluster EXIT -# run $* +run $* check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"