Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: last bound source is prior to other source when pick for bound (#1373) #1397

Merged
merged 1 commit into from
Jan 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
// UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// k/v: Encode(name) -> the bound relationship.
UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/")
// UpstreamLastBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// different with UpstreamBoundWorkerKeyAdapter, this kv should not be deleted when unbound, to provide a priority
// k/v: Encode(name) -> the bound relationship.
UpstreamLastBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/last-bound-worker/")
// TaskConfigKeyAdapter used to store task config string.
// k/v: Encode(task-name) -> task-config-string
TaskConfigKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/task/")
Expand Down Expand Up @@ -77,7 +81,8 @@ var (
func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter:
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter,
UpstreamLastBoundWorkerKeyAdapter:
return 1
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter,
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
Expand Down
57 changes: 46 additions & 11 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type Scheduler struct {
// - when bounded the source to a worker.
unbounds map[string]struct{}

// a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound
lastBound map[string]ha.SourceBound

// expectant relay stages for sources, source ID -> stage.
// add:
// - bound the source to a worker (at first time).
Expand Down Expand Up @@ -143,6 +146,7 @@ func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler {
workers: make(map[string]*Worker),
bounds: make(map[string]*Worker),
unbounds: make(map[string]struct{}),
lastBound: make(map[string]ha.SourceBound),
expectRelayStages: make(map[string]ha.Stage),
expectSubTaskStages: make(map[string]map[string]ha.Stage),
securityCfg: securityCfg,
Expand Down Expand Up @@ -906,6 +910,11 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
if err != nil {
return 0, err
}
lastSourceBoundM, _, err := ha.GetLastSourceBounds(cli)
if err != nil {
return 0, err
}
s.lastBound = lastSourceBoundM

// 3. get all history offline status.
kam, rev, err := ha.GetKeepAliveWorkers(cli)
Expand Down Expand Up @@ -1164,14 +1173,22 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error {
return nil
}

// tryBoundForWorker tries to bound a random unbounded source to the worker.
// tryBoundForWorker tries to bound a source to the worker. first try last source of this worker, then randomly pick one
// returns (true, nil) after bounded.
func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// 1. check whether any unbound source exists.
var source string
for source = range s.unbounds {
break // got a source.
// 1. check if last bound is still available.
// if lastBound not found, or this source has been bounded to another worker (we also check that source still exists
// here), randomly pick one from unbounds.
// NOTE: if worker isn't in lastBound, we'll get "zero" SourceBound and it's OK, because "zero" string is not in
// unbounds
source := s.lastBound[w.baseInfo.Name].Source
if _, ok := s.unbounds[source]; !ok {
source = ""
for source = range s.unbounds {
break // got a source.
}
}

if source == "" {
s.logger.Info("no unbound sources need to bound", zap.Stringer("worker", w.BaseInfo()))
return false, nil
Expand All @@ -1198,14 +1215,31 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {
// tryBoundForSource tries to bound a source to a random Free worker.
// returns (true, nil) after bounded.
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
// 1. try to find a random Free worker.
// 1. try to find history workers, then random Free worker.
var worker *Worker
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
break
for workerName, bound := range s.lastBound {
if bound.Source == source {
w, ok := s.workers[workerName]
if !ok {
// a not found worker
continue
}
if w.Stage() == WorkerFree {
worker = w
break
}
}
}

if worker == nil {
for _, w := range s.workers {
if w.Stage() == WorkerFree {
worker = w
break
}
}
}

if worker == nil {
s.logger.Info("no free worker exists for bound", zap.String("source", source))
return false, nil
Expand Down Expand Up @@ -1280,14 +1314,15 @@ func (s *Scheduler) deleteWorker(name string) {

// updateStatusForBound updates the in-memory status for bound, including:
// - update the stage of worker to `Bound`.
// - record the bound relationship in the scheduler.
// - record the bound relationship and last bound relationship in the scheduler.
// this func is called after the bound relationship existed in etcd.
func (s *Scheduler) updateStatusForBound(w *Worker, b ha.SourceBound) error {
err := w.ToBound(b)
if err != nil {
return err
}
s.bounds[b.Source] = w
s.lastBound[b.Worker] = b
return nil
}

Expand Down
74 changes: 74 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,3 +944,77 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
cancel3()
wg.Wait()
}

func (t *testScheduler) TestLastBound(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
sourceID2 = "mysql-replica-2"
workerName1 = "dm-worker-1"
workerName2 = "dm-worker-2"
workerName3 = "dm-worker-3"
workerName4 = "dm-worker-4"
sourceCfg1 config.SourceConfig
)

c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2
worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}}
worker2 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName2}}
worker3 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName3}}
worker4 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName4}}

// step 1: start an empty scheduler without listening the worker event
s.started = true
s.etcdCli = etcdTestCli
s.workers[workerName1] = worker1
s.workers[workerName2] = worker2
s.workers[workerName3] = worker3
s.workers[workerName4] = worker4

s.lastBound[workerName1] = ha.SourceBound{Source: sourceID1}
s.lastBound[workerName2] = ha.SourceBound{Source: sourceID2}
s.unbounds[sourceID1] = struct{}{}
s.unbounds[sourceID2] = struct{}{}

// worker1 goes to last bounded source
worker1.ToFree()
bounded, err := s.tryBoundForWorker(worker1)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID1], DeepEquals, worker1)

// worker3 has to bounded to source2
worker3.ToFree()
bounded, err = s.tryBoundForWorker(worker3)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID2], DeepEquals, worker3)

// though worker2 has a previous source, that source is not available, so not bound
worker2.ToFree()
bounded, err = s.tryBoundForWorker(worker2)
c.Assert(err, IsNil)
c.Assert(bounded, IsFalse)

// worker4 is used to test whether source2 should be bounded to worker2 rather than a new worker
worker4.ToFree()
bounded, err = s.tryBoundForWorker(worker4)
c.Assert(err, IsNil)
c.Assert(bounded, IsFalse)

// after worker3 become offline, worker2 should be bounded to worker2
s.updateStatusForUnbound(sourceID2)
_, ok := s.bounds[sourceID2]
c.Assert(ok, IsFalse)
worker3.ToOffline()
bounded, err = s.tryBoundForSource(sourceID2)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID2], DeepEquals, worker2)
}
45 changes: 38 additions & 7 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func sourceBoundFromJSON(s string) (b SourceBound, err error) {
func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error) {
ops := make([]clientv3.Op, 0, len(bounds))
for _, bound := range bounds {
op, err := putSourceBoundOp(bound)
boundOps, err := putSourceBoundOp(bound)
if err != nil {
return 0, err
}
ops = append(ops, op)
ops = append(ops, boundOps...)
}
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
Expand Down Expand Up @@ -137,6 +137,29 @@ func GetSourceBound(cli *clientv3.Client, worker string) (map[string]SourceBound
return sbm, resp.Header.Revision, nil
}

// GetLastSourceBounds gets all last source bound relationship. Different with GetSourceBound, "last source bound" will
// not be deleted when worker offline
func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

var (
sbm = make(map[string]SourceBound)
)
resp, err := cli.Get(ctx, common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())

if err != nil {
return sbm, 0, err
}

sbm, err = sourceBoundFromResp("", resp)
if err != nil {
return sbm, 0, err
}

return sbm, resp.Header.Revision, nil
}

// GetSourceBoundConfig gets the source bound relationship and relative source config at the same time
// for the specified DM-worker. The index worker **must not be empty**:
// if source bound is empty, will return an empty sourceBound and an empty source config
Expand Down Expand Up @@ -310,14 +333,22 @@ func deleteSourceBoundOp(worker string) clientv3.Op {
return clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
}

// putSourceBoundOp returns a PUT etcd operation for the bound relationship.
// deleteLastSourceBoundOp returns a DELETE ectd operation for the last bound relationship of the specified DM-worker.
func deleteLastSourceBoundOp(worker string) clientv3.Op {
return clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Encode(worker))
}

// putSourceBoundOp returns PUT etcd operations for the bound relationship.
// k/v: worker-name -> bound relationship.
func putSourceBoundOp(bound SourceBound) (clientv3.Op, error) {
func putSourceBoundOp(bound SourceBound) ([]clientv3.Op, error) {
value, err := bound.toJSON()
if err != nil {
return clientv3.Op{}, err
return []clientv3.Op{}, err
}
key := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker)
key1 := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker)
op1 := clientv3.OpPut(key1, value)
key2 := common.UpstreamLastBoundWorkerKeyAdapter.Encode(bound.Worker)
op2 := clientv3.OpPut(key2, value)

return clientv3.OpPut(key, value), nil
return []clientv3.Op{op1, op2}, nil
}
7 changes: 4 additions & 3 deletions pkg/ha/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBou
if err != nil {
return 0, err
}
ops := make([]clientv3.Op, 0, 2)
ops := make([]clientv3.Op, 0, 3)
ops = append(ops, ops1...)
ops = append(ops, op2)
ops = append(ops, op2...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}
Expand All @@ -48,7 +48,8 @@ func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker s
sourceCfgOp := deleteSourceCfgOp(source)
relayStageOp := deleteRelayStageOp(source)
sourceBoundOp := deleteSourceBoundOp(worker)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp)
lastBoundOp := deleteLastSourceBoundOp(worker)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp, lastBoundOp)
return rev, err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ha/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ func ClearTestInfoOperation(cli *clientv3.Client) error {
clearWorkerInfo := clientv3.OpDelete(common.WorkerRegisterKeyAdapter.Path(), clientv3.WithPrefix())
clearWorkerKeepAlive := clientv3.OpDelete(common.WorkerKeepAliveKeyAdapter.Path(), clientv3.WithPrefix())
clearBound := clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())
clearLastBound := clientv3.OpDelete(common.UpstreamLastBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())
clearRelayStage := clientv3.OpDelete(common.StageRelayKeyAdapter.Path(), clientv3.WithPrefix())
clearSubTaskStage := clientv3.OpDelete(common.StageSubTaskKeyAdapter.Path(), clientv3.WithPrefix())
_, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clearSource, clearTask, clearSubTask, clearWorkerInfo, clearBound,
clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage)
clearLastBound, clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage)
return err
}
37 changes: 37 additions & 0 deletions tests/ha_cases/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,40 @@ function isolate_worker() {
run_dm_worker $WORK_DIR/worker$1 $port $cur/conf/dm-worker$1.toml
export GO_FAILPOINTS=''
}

function check_bound() {
bound1=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker1 \
| grep 'source' | awk -F: '{print $2}')
bound2=$($PWD/bin/dmctl.test DEVEL --master-addr "127.0.0.1:$MASTER_PORT1" list-member --name worker2 \
| grep 'source' | awk -F: '{print $2}')
if [[ $worker1bound != $bound1 || $worker2bound != $bound2 ]]; then
echo "worker1bound $worker1bound bound1 $bound1"
echo "worker2bound $worker2bound bound2 $bound2"
exit 1
fi
}

function start_2_worker_ensure_bound() {
worker_ports_2=(0 $WORKER1_PORT $WORKER2_PORT $WORKER3_PORT $WORKER4_PORT $WORKER5_PORT)

echo "start worker$1"
run_dm_worker $WORK_DIR/worker$1 ${worker_ports_2[$1]} $cur/conf/dm-worker$1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$1]}
echo "start worker$2"
run_dm_worker $WORK_DIR/worker$2 ${worker_ports_2[$2]} $cur/conf/dm-worker$2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:${worker_ports_2[$2]}
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"list-member --name worker$1 --name worker$2" \
"\"source\": \"mysql-replica-01\"" 1 \
"\"source\": \"mysql-replica-02\"" 1
}

function kill_2_worker_ensure_unbound() {
echo "kill dm-worker$1"
ps aux | grep dm-worker$1 |awk '{print $2}'|xargs kill || true
echo "kill dm-worker$2"
ps aux | grep dm-worker$2 |awk '{print $2}'|xargs kill || true
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"list-member --name worker$1 --name worker$2" \
"\"source\": \"\"" 2
}
Loading