Skip to content

Commit

Permalink
Merge branch 'master' into debug-ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Dec 21, 2024
2 parents 5ae59f8 + 7f57e1f commit 4a6d7b0
Show file tree
Hide file tree
Showing 95 changed files with 2,661 additions and 1,249 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/dataflow_engine_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,3 @@ jobs:
name: chaos-base-logs.${{ matrix.chaos-obj }}
path: |
./logs
# Send feishu notification if failed.
- name: Feishu notification
continue-on-error: true
uses: foxundermoon/feishu-action@v2
if: ${{ failure() }}
with:
url: ${{ secrets.ENGINE_FEISHU_NOTIFY_URL }}
msg_type: text
content: |
text: |
dataflow engine chaos job failed, see https://github.com/pingcap/tiflow/actions/runs/${{ github.run_id }}
44 changes: 33 additions & 11 deletions .github/workflows/dm_binlog_999999.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ jobs:
test-binlog-999999:
name: Test binlog 999999
runs-on: ubuntu-20.04
services:
docker:
image: docker:19.03.12
options: >-
--privileged
steps:
- name: Set up Go env
Expand All @@ -30,6 +35,14 @@ jobs:
with:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Set DOCKER_HOST
run: echo "export DOCKER_HOST=unix:///var/run/docker.sock" >> $GITHUB_ENV

- name: Install docker-compose
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
- name: Cache go modules
uses: actions/cache@v2
with:
Expand All @@ -48,7 +61,9 @@ jobs:

- name: Setup CI environment
run: |
docker-compose -f ./dm/tests/binlog_999999/docker-compose.yml up -d
sudo apt-get update
sudo apt-get install -y curl
sudo docker-compose -f ./dm/tests/binlog_999999/docker-compose.yml up -d
curl http://download.pingcap.org/tidb-enterprise-tools-nightly-linux-amd64.tar.gz | tar xz
mv tidb-enterprise-tools-nightly-linux-amd64/bin/sync_diff_inspector bin/
curl http://download.pingcap.org/tidb-nightly-linux-amd64.tar.gz | tar xz
Expand All @@ -57,7 +72,7 @@ jobs:
chmod +x minio
mv minio bin/
- name: change binlog sequence number to 999998
- name: Change binlog sequence number to 999999
run: |
while ! mysqladmin -h127.0.0.1 -P3306 -p123456 -uroot ping --connect-timeout=1 > /dev/null 2>&1 ; do
echo "wait mysql"
Expand Down Expand Up @@ -87,11 +102,16 @@ jobs:
sleep 1
done
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3306 -p123456
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3307 -p123456
- name: Run test cases
run: |
RESET_MASTER=false make dm_integration_test CASE=incremental_mode
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3306 -p123456 | grep -q "mysql-bin.1000000"
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3307 -p123456 | grep -q "mysql-bin.1000000"
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3306 -p123456
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3307 -p123456
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3306 -p123456 | grep -q "mysql-bin.1000003"
echo "show binary logs;" | mysql -uroot -h127.0.0.1 -P3307 -p123456 | grep -q "mysql-bin.1000002"
- name: Copy logs to hack permission
if: ${{ always() }}
Expand All @@ -112,12 +132,14 @@ jobs:
path: |
./logs
# send Slack notify if failed.
# NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository.
- name: Slack notification
# Send feishu notification if failed.
- name: Feishu notification
continue-on-error: true
uses: foxundermoon/feishu-action@v2
if: ${{ failure() }}
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }}
uses: Ilshidur/[email protected]
with:
args: "binlog 999999 failed, see https://github.com/pingcap/tiflow/actions/runs/{{ GITHUB_RUN_ID }}"
url: ${{ secrets.ENGINE_FEISHU_NOTIFY_URL }}
msg_type: text
content: |
text: |
binlog 999999 job failed, see https://github.com/pingcap/tiflow/actions/runs/${{ github.run_id }}
19 changes: 12 additions & 7 deletions .github/workflows/dm_upstream_switch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ jobs:

- name: Setup containers
run: |
sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
docker-compose -f ./dm/tests/upstream_switch/docker-compose.yml up -d
- name: Run test cases
Expand All @@ -73,12 +75,15 @@ jobs:
path: |
./logs
# send Slack notify if failed.
# NOTE: With the exception of `GITHUB_TOKEN`, secrets are not passed to the runner when a workflow is triggered from a forked repository.
- name: Slack notification
# Send feishu notification if failed.
- name: Feishu notification
continue-on-error: true
uses: foxundermoon/feishu-action@v2
if: ${{ failure() }}
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_NOTIFY }}
uses: Ilshidur/[email protected]
with:
args: "upstream-switch job failed, see https://github.com/pingcap/tiflow/actions/runs/{{ GITHUB_RUN_ID }}"
url: ${{ secrets.ENGINE_FEISHU_NOTIFY_URL }}
msg_type: text
content: |
text: |
dm upstream switch job failed, see https://github.com/pingcap/tiflow/actions/runs/${{ github.run_id }}
2 changes: 1 addition & 1 deletion cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

// owner apis
ownerGroup := v2.Group("/owner")
unsafeGroup.Use(ownerMiddleware)
ownerGroup.Use(ownerMiddleware)
ownerGroup.POST("/resign", api.resignOwner)

// common APIs
Expand Down
3 changes: 0 additions & 3 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,6 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
return nil, errors.Cause(err)
}
if !replicaCfg.ForceReplicate && !cfg.ReplicaConfig.IgnoreIneligibleTable {
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
}
Expand Down
88 changes: 12 additions & 76 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -65,19 +64,6 @@ type rowKVEntry struct {
PreRowExist bool
}

// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history
// and the column id of `job_meta` in these two tables.
type DDLTableInfo struct {
// ddlJobsTable use to parse all ddl jobs except `create table`
DDLJobTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
JobMetaColumnIDinJobTable int64
// ddlHistoryTable only use to parse `create table` ddl job
DDLHistoryTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_history`.
JobMetaColumnIDinHistoryTable int64
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
// DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
Expand Down Expand Up @@ -306,89 +292,39 @@ func IsLegacyFormatJob(rawKV *model.RawKVEntry) bool {
return bytes.HasPrefix(rawKV.Key, metaPrefix)
}

// ParseDDLJob parses the job from the raw KV entry.
func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) {
// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`.
func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) {
var v []byte
var datum types.Datum

// for test case only
if bytes.HasPrefix(rawKV.Key, metaPrefix) {
// old queue base job.
v = rawKV.Value
job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
if err != nil || job == nil {
job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}
return job, err
}

recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}

tableID := tablecodec.DecodeTableID(rawKV.Key)

// parse it with tidb_ddl_job
if tableID == spanz.JobTableID {
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC)
} else {
// DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column.
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}
datum = row[ddlTableInfo.JobMetaColumnIDinJobTable]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
} else if tableID == spanz.JobHistoryID {
// parse it with tidb_ddl_history
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC)
row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC)
if err != nil {
return nil, errors.Trace(err)
}
datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable]
datum := row[id]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}

return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID)
return parseJob(v, rawKV.StartTs, rawKV.CRTs)
}

// parseJob unmarshal the job from "v".
// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history
// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off
// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully.
// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job,
// and being inserted into tidb_ddl_history after being executed successfully.
// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully.
//
// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job.
// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job)
// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice.
// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history.
// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls.
func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) {
func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
var job timodel.Job
err := json.Unmarshal(v, &job)
if err != nil {
return nil, errors.Trace(err)
}

if fromHistoryTable {
// we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls.
// We only want the job with `JobStateSynced`, which is means the ddl job is done successfully.
// Besides, to satisfy the subsequent processing,
// We need to set the job to be Done to make it will replay in schemaStorage
if (job.Type != timodel.ActionCreateTable && job.Type != timodel.ActionCreateTables) || job.State != timodel.JobStateSynced {
return nil, nil
}
job.State = timodel.JobStateDone
} else {
// we need to get all ddl job which is done from tidb_ddl_job
if !job.IsDone() {
return nil, nil
}
if !job.IsDone() {
return nil, nil
}

// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = startTs
Expand Down
18 changes: 0 additions & 18 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,6 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error {
return nil
}

// GetSchemaVersion returns the schema version of the meta.
func GetSchemaVersion(meta timeta.Reader) (int64, error) {
// After we get the schema version at startTs, if the diff corresponding to that version does not exist,
// it means that the job is not committed yet, so we should subtract one from the version, i.e., version--.
version, err := meta.GetSchemaVersion()
if err != nil {
return 0, errors.Trace(err)
}
diff, err := meta.GetSchemaDiff(version)
if err != nil {
return 0, errors.Trace(err)
}
if diff == nil {
version--
}
return version, nil
}

// NewSnapshotFromMeta creates a schema snapshot from meta.
func NewSnapshotFromMeta(
id model.ChangeFeedID,
Expand Down
Loading

0 comments on commit 4a6d7b0

Please sign in to comment.